[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r166394747
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,151 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-fullOutput: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+path: Option[String] = None,
+table: Option[TableIdentifier] = None,
--- End diff --

I'm not saying that `DataSourceOptions` have to be handled in the relation. 
Just that the relation should use the same classes to pass data, like 
`TableIdentifier`, that are used by the rest of the planner. I agree with those 
benefits of doing this.

Is there anything that needs to change in this PR? We can move where the 
options are created in a follow-up, but let me know if you think this would 
prevent this from getting merged.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r166386661
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,151 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-fullOutput: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+path: Option[String] = None,
+table: Option[TableIdentifier] = None,
--- End diff --

I think that `TableIdentifier` and a string-to-string `Map` should be 
passed to `DataSourceV2Relation` and that either the relation or 
`DataSourceOptions` should be responsible for creating `DataSourceOptions` with 
well-defined properties to to pass the table information to implementations.

This minimizes the number of places that need to handle `DataSourceOptions` 
(which is specific to v2) and uses `TableIdentifier` and `Map` to match the 
rest of the planner nodes. For example, other read paths that can create 
`DataSourceV2Relation`, like resolution rules, use `TableIdentifier`.

I'm not currently advocating a position on how to configure 
`DataFrameReader` or `DataFrameWriter` or how to handle schemas.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable logical ...

2018-02-06 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20387
  
@cloud-fan, this is a single commit on purpose because predicate push-down 
makes plan changes. I think it's best to do these at once to avoid unnecessary 
work. That's why I started looking more closely at push-down in the first 
place: updating the other push-down code for immutable plans was a mess.

I also think it is unlikely that we will need to revert the push-down 
changes here. If we end up redesigning push-down, then it is unlikely that the 
easiest starting point is to roll back this fix.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r166381800
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
   writeTask: DataWriterFactory[InternalRow],
   context: TaskContext,
   iter: Iterator[InternalRow]): WriterCommitMessage = {
-val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
+val stageId = context.stageId()
+val partId = context.partitionId()
+val attemptId = context.attemptNumber()
+val dataWriter = writeTask.createDataWriter(partId, attemptId)
 
 // write the data and commit this writer.
 Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
   iter.foreach(dataWriter.write)
-  logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
-  val msg = dataWriter.commit()
-  logInfo(s"Writer for partition ${context.partitionId()} committed.")
+
+  val msg = if (writeTask.useCommitCoordinator) {
+val coordinator = SparkEnv.get.outputCommitCoordinator
--- End diff --

What do you have in mind to "introduce the concept"?

I'm happy to add more docs. Do you want me to add them to this PR or in a 
follow-up? Are you targeting this for 2.3.0?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r166360278
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
   writeTask: DataWriterFactory[InternalRow],
   context: TaskContext,
   iter: Iterator[InternalRow]): WriterCommitMessage = {
-val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
+val stageId = context.stageId()
+val partId = context.partitionId()
+val attemptId = context.attemptNumber()
+val dataWriter = writeTask.createDataWriter(partId, attemptId)
 
 // write the data and commit this writer.
 Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
   iter.foreach(dataWriter.write)
-  logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
-  val msg = dataWriter.commit()
-  logInfo(s"Writer for partition ${context.partitionId()} committed.")
+
+  val msg = if (writeTask.useCommitCoordinator) {
+val coordinator = SparkEnv.get.outputCommitCoordinator
--- End diff --

The API is flexible. The problem is that it defaults to no coordination, 
which cause correctness bugs.

The safe option is to coordinate commits by default. If an implementation 
doesn't change the default, then it at least won't duplicate task outputs in 
job commit. Worst case is that it takes a little longer for committers that 
don't need coordination. On the other hand, not making this the default will 
cause some writers to work most of the time, but duplicate data in some cases.

What do you think is the down side to adding this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20495: [SPARK-23327] [SQL] Update the description and te...

2018-02-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20495#discussion_r166358420
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 ---
@@ -1655,15 +1655,17 @@ case class Left(str: Expression, len: Expression, 
child: Expression) extends Run
  */
 // scalastyle:off line.size.limit
 @ExpressionDescription(
-  usage = "_FUNC_(expr) - Returns the character length of `expr` or number 
of bytes in binary data.",
+  usage = "_FUNC_(expr) - Returns the character length of `expr` or number 
of bytes in binary data. " +
--- End diff --

+1 for string data / binary data


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r166165255
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,151 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-fullOutput: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+path: Option[String] = None,
+table: Option[TableIdentifier] = None,
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+"DataSourceV2Relation(" +
+  s"source=$sourceName${path.orElse(table).map(loc => 
s"($loc)").getOrElse("")}, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}] options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
+}
+  }
+
+  private lazy val v2Options: DataSourceOptions = {
+// ensure path and table options are set correctly
+val updatedOptions = new mutable.HashMap[String, String]
+updatedOptions ++= options
+
+path match {
+  case Some(p) =>
+updatedOptions.put("path", p)
+  case None =>
+updatedOptions.remove("path")
+}
+
+table.map { ident =>
+  updatedOptions.put("table", ident.table)
--- End diff --

Opened [SPARK-23341](https://issues.apache.org/jira/browse/SPARK-23341) for 
this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r166165005
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,151 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-fullOutput: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+path: Option[String] = None,
+table: Option[TableIdentifier] = None,
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+"DataSourceV2Relation(" +
+  s"source=$sourceName${path.orElse(table).map(loc => 
s"($loc)").getOrElse("")}, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}] options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
+}
+  }
+
+  private lazy val v2Options: DataSourceOptions = {
+// ensure path and table options are set correctly
+val updatedOptions = new mutable.HashMap[String, String]
+updatedOptions ++= options
+
+path match {
+  case Some(p) =>
+updatedOptions.put("path", p)
+  case None =>
+updatedOptions.remove("path")
+}
+
+table.map { ident =>
+  updatedOptions.put("table", ident.table)
--- End diff --

I think we agree here. I want to avoid doing this outside of either 
`DataSourceOptions` or `DataSourceV2Relation`. If we can create 
`DataSourceOptions` from a `Option[TableIdentifier]` and add the `getTable` 
accessor, then that works for me. My main motivation is to avoid having this 
piece of code copied throughout the SQL planner.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

2018-02-05 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20490
  
I've updated this to no longer require #20387. It wasn't relying on those 
changes at all. @gatorsmile, @cloud-fan, what do you think about getting this 
into 2.3.0?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r166081367
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,151 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-fullOutput: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+path: Option[String] = None,
+table: Option[TableIdentifier] = None,
--- End diff --

I should clarify this as well: I like the idea of standardizing how 
implementations access the table name from `DataSourceOptions`. However, I 
don't think that is sufficient to remove the table and path here in the 
definition of `DataSourceV2Relation` for the reasons above.

I think we *should* follow this commit with a plan for how implementations 
access these options. For now, it is good to put the creation of those options 
in a single place.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable logical ...

2018-02-05 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20387
  
> For safety, I wanna keep it unchanged, and start something new for data 
source v2 only.

I disagree.

* **#20476 addresses a bug caused by the new implementation that is not a 
problem if we reuse the current push-down code.** Using an entirely new 
implementation to push filters and projection is going to introduce bugs, and 
that problem demonstrates that it is a real risk.
* **Using unreliable push-down code is going to make it more difficult for 
anyone to use the v2 API.**
* **This approach throws away work that has accumulated over the past few 
years that give us confidence in the current push-down code.** The other code 
paths have push-down tests that will help us catch bugs in the new push-down 
logic. If we limit the scope of this change to v2, we will not be able to reuse 
those tests and will have to write entirely new ones that cover all cases.

Lastly, I think it is clear that we need a design for a new push-down 
mechanism. **Adding this to DataSourceV2 as feature creep is not a good way to 
redesign it.** I'd like to see a design document that addresses some of the 
open questions.

I'd also prefer that this new implementation be removed from the v2 code 
path for 2.3.0. @marmbrus, what do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r166030958
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,151 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-fullOutput: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+path: Option[String] = None,
+table: Option[TableIdentifier] = None,
--- End diff --

@cloud-fan, sorry if it was not clear: Yes, I have considered it and I 
think it is a bad idea. I sent a note to the dev list about this issue, as well 
if you want more context. There are two main reasons:

1. Your proposal creates more places that are responsible for creating a 
`DataSourceOptions` with the right property names. All of the places where we 
have a `TableIdentifier` and want to convert to a `DataSourceV2Relation` need 
to copy the same logic and worry about using the same properties.
What you propose is hard to maintain and error prone: what happens if 
we decide not to pass the database if it is `None` in a `TableIdentifier`? We 
would have to validate every place that creates a v2 relation. On the other 
hand, if we pass `TableIdentifier` here, we have one code path that converts. 
It is also easier for us to pass `TableIdentifier` to the data sources if we 
choose to update the API.
2. There is no reason to use `DataSourceOptions` outside of v2 at this 
point. This PR doesn't expose the v2-specific options class to other places in 
the codebase. Instead, it uses a map for generic options and classes that can 
be used in pattern matching where possible. And again, this has fewer places 
that create v2 internal classes, which is easier for maintenance.

If you want to add those methods to the options class so that 
implementations can easily access path and table name, then we can do that in a 
follow-up PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable logical ...

2018-02-02 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20387
  
> Why pushdown is happening in logical optimization and not during query 
planning. My first instinct would be to have the optimizer get operators as 
close to the leaves as possible and then fuse (or push down) as we convert to 
physical plan. I'm probably missing something.

I think there are two reasons, but I'm not fully convinced by either one:

* 
[`computeStats`](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala#L232)
 is defined on logical plans, so the result of filter push-down needs to be a 
logical plan if we want to be able to use accurate stats for a scan. I'm 
interested here to ensure that we correctly produce broadcast relations based 
on the actual scan stats, not the table-level stats. Maybe there's another way 
to do this?
* One of the tests for DSv2 ends up invoking the push-down rule twice, 
which made me think about whether or not that should be valid. I think it 
probably should be. For example, what if a plan has nodes that can all be 
pushed, but they aren't in the right order? Or what if a projection wasn't 
pushed through a filter because of a rule problem, but it can still be pushed 
down? Incremental fusing during optimization might be an extensible way to 
handle odd cases, or it may be useless. I'm not quite sure yet.

It would be great to hear your perspective on these.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20490: [SPARK-23323][SQL]: Add support for commit coordinator f...

2018-02-02 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20490
  
@dongjoon-hyun, @cloud-fan, @gatorsmile. Once the immutable plan PR is in, 
this can be reviewed.

@steveloughran, I think this is what you were asking for.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Add support for commit coordi...

2018-02-02 Thread rdblue
GitHub user rdblue opened a pull request:

https://github.com/apache/spark/pull/20490

[SPARK-23323][SQL]: Add support for commit coordinator for DataSourceV2 
writes

## What changes were proposed in this pull request?

DataSourceV2 batch writes should use the output commit coordinator if it is 
required by the data source. This adds a new method, 
`DataWriterFactory#useCommitCoordinator`, that determines whether the 
coordinator will be used. If the write factory returns true, 
`WriteToDataSourceV2` will use the coordinator for batch writes.

This relies on the commits in #20387. Once that is committed, this will be 
rebased. Only the last commit is part of this PR.

## How was this patch tested?

This relies on existing write tests, which now use the commit coordinator.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/spark 
SPARK-23323-add-commit-coordinator

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20490.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20490


commit 62c569672083c0fa633da1d6edaba40d0bb05819
Author: Ryan Blue <blue@...>
Date:   2018-01-17T21:58:12Z

SPARK-22386: DataSourceV2: Use immutable logical plans.

commit f0bd45d3c931941b8092cdac738cb29954e0acdd
Author: Ryan Blue <blue@...>
Date:   2018-01-24T19:34:42Z

SPARK-23203: Fix scala style check.

commit 2fdeb4556cd22a092630b341a22a16a59e377183
Author: Ryan Blue <blue@...>
Date:   2018-01-24T19:54:10Z

SPARK-23203: Fix Kafka tests, use StreamingDataSourceV2Relation.

This also removes unused imports.

commit ab945a19efe666c41deae9c044002f3455220c1d
Author: Ryan Blue <blue@...>
Date:   2018-02-02T20:30:33Z

SPARK-23204: DataFrameReader: Remove v2 table identifier parsing.

commit f1d9872a2699cdbd5c87b02e702dc8103335131d
Author: Ryan Blue <blue@...>
Date:   2018-02-02T21:48:29Z

SPARK-23203: Remove import changes from DataSourceV2Utils.

commit 288af6a2729c769e0d4075a8f9190958ab5a211c
Author: Ryan Blue <blue@...>
Date:   2018-02-02T22:21:48Z

SPARK-23323: DataSourceV2: support commit coordinator.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20488: [SPARK-23321][SQL]: Validate datasource v2 writes

2018-02-02 Thread rdblue
GitHub user rdblue opened a pull request:

https://github.com/apache/spark/pull/20488

[SPARK-23321][SQL]: Validate datasource v2 writes

## What changes were proposed in this pull request?

DataSourceV2 does not currently apply any validation rules when writing. 
Other write paths attempt to validate that a data frame can be written to a 
target table or path and these changes add the same logic to v2.

This updates the logical plan to use InsertIntoTable and applies the insert 
preprocess rules to writes. It also adds a conversion rule from InserIntoTable 
to DataSourceV2Write because InsertIntoTable cannot be used in logical plans 
after analysis.

InsertIntoTable is not necessarily the right logical plan. It assumes that 
the table exists and can report its schema.

## How was this patch tested?

Added a test that fails analysis in the preprocess rule.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/spark 
SPARK-23321-validate-datasource-v2-writes

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20488.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20488


commit 62c569672083c0fa633da1d6edaba40d0bb05819
Author: Ryan Blue <blue@...>
Date:   2018-01-17T21:58:12Z

SPARK-22386: DataSourceV2: Use immutable logical plans.

commit f0bd45d3c931941b8092cdac738cb29954e0acdd
Author: Ryan Blue <blue@...>
Date:   2018-01-24T19:34:42Z

SPARK-23203: Fix scala style check.

commit 2fdeb4556cd22a092630b341a22a16a59e377183
Author: Ryan Blue <blue@...>
Date:   2018-01-24T19:54:10Z

SPARK-23203: Fix Kafka tests, use StreamingDataSourceV2Relation.

This also removes unused imports.

commit ab945a19efe666c41deae9c044002f3455220c1d
Author: Ryan Blue <blue@...>
Date:   2018-02-02T20:30:33Z

SPARK-23204: DataFrameReader: Remove v2 table identifier parsing.

commit 3580daf15497a1d49112a0eddd556f74b9b3e280
Author: Ryan Blue <blue@...>
Date:   2018-02-02T19:04:23Z

SPARK-23321: Apply preprocess insert rules to DataSourceV2.

This updates the DataSourceV2 write path to use DataSourceV2Relation and
InsertIntoTable to apply the insert preprocess rules.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable logical ...

2018-02-02 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20387
  
@cloud-fan, @dongjoon-hyun, @gatorsmile, I've rebased this on master and 
removed the support for SPARK-23204 that parses table identifiers. If you need 
other changes to get this in, let me know. As far as I'm aware, this isn't 
targeting 2.3.0 so it makes sense to keep the `PhysicalOperation` push-down 
rules.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20477: [SPARK-23303][SQL] improve the explain result for...

2018-02-02 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20477#discussion_r165728696
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -36,11 +38,14 @@ import org.apache.spark.sql.types.StructType
  */
 case class DataSourceV2ScanExec(
 fullOutput: Seq[AttributeReference],
-@transient reader: DataSourceReader)
+@transient reader: DataSourceReader,
+@transient sourceClass: Class[_ <: DataSourceV2])
   extends LeafExecNode with DataSourceReaderHolder with ColumnarBatchScan {
 
   override def canEqual(other: Any): Boolean = 
other.isInstanceOf[DataSourceV2ScanExec]
 
+  override def simpleString: String = s"Scan $metadataString"
--- End diff --

+1 for overriding nodeName.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20485: [SPARK-23315][SQL] failed to get output from canonicaliz...

2018-02-02 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20485
  
Sounds fine to me, then.

My focus is on the long-term design issues. I still think that the changes 
to make plans immutable and to use the existing push-down code as much as 
possible is the best way to get a reliable 2.3.0, but it is fine if they don't 
make the release.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Use immut...

2018-02-02 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20387
  
@cloud-fan, I'll update this PR and we can talk about passing configuration 
on the dev list.

And as a reminder, please close #20445.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Use immut...

2018-02-02 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20387
  
> I tried and can't figure out how to do it with PhysicalOperation, that's 
why I build something new for data source v2 pushdown.

The problem is that we should get DSv2 working independently of a redesign 
of the push-down rules. Throwing an untested push-down rule into changes for 
DSv2 makes the new API less reliable, and hurts people that want to try it out 
and start using it. There is no benefit to doing this for 2.3.0.

I also think a redesign of push-down should be properly designed, thought 
out, and tested. I'm all for fixing this if you can make the case that we need 
to, but we shouldn't needlessly mix together major changes.

@cloud-fan, There's more discussion about this on #20476 that I encourage 
you to read.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20485: [SPARK-23315][SQL] failed to get output from canonicaliz...

2018-02-02 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20485
  
To be clear, the purpose of this commit, like #20476, is just to get 
something working for the 2.3.0 release?

I just want to make sure since I think we should be approaching these 
problems with a better initial design for the integration. I'm fine getting 
this in to unblock a release, but if it isn't for that purpose then I think we 
should fix the design problems first.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20476: [SPARK-23301][SQL] data source column pruning should wor...

2018-02-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20476
  
Yeah, I did review it, but at the time I wasn't familiar with how the other 
code paths worked and assumed that it was necessary to introduce this. I wasn't 
very familiar with how it *should* work, so I didn't +1 it.

There are a few telling comments though:

> How do we know that there aren't more cases that need to be supported?

> What are the guarantees made by the previous batches in the optimizer? 
The work done by FilterAndProject seems redundant to me because the optimizer 
should already push filters below projection. Is that not guaranteed by the 
time this runs?

In any case, I now think that we should not introduce a new push-down 
design in conjunction with DSv2. Let's get DSv2 working properly and redesign 
push-down separately. In parallel is fine by me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20476: [SPARK-23301][SQL] data source column pruning should wor...

2018-02-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20476
  
@gatorsmile, Do you mean this?

> Extensibility is not good and operator push-down capabilities are limited.

If so, that's very open to interpretation. I would assume it means that the 
V2 interfaces should support more than just projection and filter push-down, 
but not a redesign of how push-down happens in the optimizer. Even if it is 
called out as a goal, I now see it as a misguided choice.

But either way, you make a good point about changing things for a release. 
I'll defer to your judgement about what should be done for the release. But for 
the long term, I think this issue underscores my point about reusing code that 
already works. Let's separate DSv2 from a push-down redesign and get it working 
reliably without introducing more risk and unknown problems.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20476: [SPARK-23301][SQL] data source column pruning should wor...

2018-02-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20476
  
@gatorsmile, thanks for the context. If we need to redesign push-down, then 
I think we should do that separately and with a design plan.

**I don't think it's a good idea to bundle it into an unrelated API 
update.**

For one thing, we want to be able to use the existing tests for the 
redesigned push-down strategy, not reimplement them in pieces. We also don't 
want to conflate the two changes for early adopters of the new API. V2 should 
be as reliable as possible by minimizing new behavior.

This just isn't the right place to test out experimental designs for 
push-down operations.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20466: [SPARK-23293][SQL] fix data source v2 self join

2018-02-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20466
  
+1

Good to get this in before changes to the relation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20476: [SPARK-23301][SQL] data source column pruning should wor...

2018-02-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20476
  
@cloud-fan, @gatorsmile, this PR demonstrates why we should use 
PhysicalOperation. I ported the tests from this PR over to our branch and they 
pass without modifying the push-down code. That's because it reuses code that 
we already trust.

I'm see no benefit to using a brand new code path for push-down when we can 
use what is already well tested. I know you want to push other operations, but 
I've already raised concerns about the design of this new code: it is brittle 
because it requires matching specific plan nodes.

Push-down should work as it always has: by pushing nodes that are adjacent 
to relations in the logical plan and relying on the optimizer to push 
projections and filters down as far as possible. The separation of concerns 
into simple rules is fundamental to the design of the optimizer. I don't think 
there is a good argument for new code that breaks how the optimizer is intended 
to work.

cc @henryr, who might want to chime in.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Use immut...

2018-01-31 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20387
  
@dongjoon-hyun, @gatorsmile, could you guys weigh in on some this 
discussion? I'd like to get additional perspectives on the changes I'm 
proposing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Use immut...

2018-01-31 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20387
  
> Let's keep it general and let the data source to interprete it.

I think this is the wrong approach. The reason why we are using a special 
`DataSourceOptions` object is to ensure that data sources consistently ignore 
case when reading **their own options**. Consistency across data sources 
matters and we should be pushing for more consistency, not less.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Use immut...

2018-01-31 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20387
  
@cloud-fan, to your point about push-down order, I'm not saying that order 
doesn't matter at all, I'm saying that the push-down can run more than once and 
it should push the closest operators. That way, if you have a situation where 
operators can't be reordered but they can all be pushed, they all get pushed 
through multiple runs of the rule, each one further refining the relation.

If we do it this way, then we don't need to traverse the logical plan to 
find out what to push down. We continue pushing projections until the plan 
stops changing. This is how the rest of the optimizer works, so I think it is a 
better approach from a design standpoint.

My implementation also reuses more existing code that we have higher 
confidence in, which is a good thing. We can add things like limit pushdown 
later, by adding it properly to the existing code. I don't see a compelling 
reason to toss out the existing implementation, especially without the same 
level of testing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Use immut...

2018-01-31 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20387
  
> `spark.read.format("iceberg").table("db.table").load()`

I'm fine with this if you think it is confusing to parse the path as a 
table name in load. I think it is reasonable.

I'd still like to keep the `Option[TableIdentifier]` parameter on the 
relation, so that we can support `table` or `insertInto` on the write path.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Use immut...

2018-01-31 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20387
  
@felixcheung, yes, we do already have a `table` option. That creates an 
`UnresolvedRelation` with the parsed table name as a `TableIdentifier`, which 
is not currently compatible with `DataSourceV2` because there is no standard 
way to pass the identifier's db and table name.

Part of the intent here is to add support in `DataSourceV2Relation` for 
cases where we have a `TableIdentifier`, so that we can add a resolver rule 
that replaces `UnresolvedRelation` with `DataSourceV2Relation`. This is what we 
do in our Spark branch.

@cloud-fan, what is your objection to support like this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20454: [SPARK-23202][SQL] Add new API in DataSourceWriter: onDa...

2018-01-31 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20454
  
+1

I'd rather not add features without a known use case, but this 
implementation looks good to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20454: [SPARK-23202][SQL] Add new API in DataSourceWrite...

2018-01-31 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20454#discussion_r165141464
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
 ---
@@ -62,6 +62,15 @@
*/
   DataWriterFactory createWriterFactory();
 
+  /**
+   * Handles a commit message on receiving from a successful data writer.
+   *
+   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
+   * failed, and {@link #abort(WriterCommitMessage[])} would be called. 
The state of the destination
+   * is undefined and {@link #abort(WriterCommitMessage[])} may not be 
able to deal with it.
--- End diff --

What does it mean that "the state of the destination is undefined"? I think 
it is sufficient to say that `abort` will be called and the contract for 
aborting commits applies.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

2018-01-31 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20386#discussion_r165138574
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
 ---
@@ -32,40 +32,44 @@
 @InterfaceStability.Evolving
 public interface StreamWriter extends DataSourceWriter {
   /**
-   * Commits this writing job for the specified epoch with a list of 
commit messages. The commit
-   * messages are collected from successful data writers and are produced 
by
-   * {@link DataWriter#commit()}.
+   * Commits this writing job for the specified epoch.
*
-   * If this method fails (by throwing an exception), this writing job is 
considered to have been
-   * failed, and the execution engine will attempt to call {@link 
#abort(WriterCommitMessage[])}.
+   * When this method is called, the number of commit messages added by
+   * {@link #add(WriterCommitMessage)} equals to the number of input data 
partitions.
+   *
+   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
+   * failed, and {@link #abort()} would be called. The state of the 
destination
+   * is undefined and @{@link #abort()} may not be able to deal with it.
*
* To support exactly-once processing, writer implementations should 
ensure that this method is
* idempotent. The execution engine may call commit() multiple times for 
the same epoch
--- End diff --

If that's the case, then this interface should be clear about it instead of 
including wording about exactly-once. For this interface, there is no 
exactly-once guarantee.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20448: [SPARK-23203][SQL] make DataSourceV2Relation immutable

2018-01-31 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20448
  
@cloud-fan, **please close this PR**. There is already a pull request for 
these changes, #20387, and ongoing discussion there.

If you want the proposed implementation to change, please ask for changes 
in a review.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20448: [SPARK-23203][SQL] make DataSourceV2Relation immu...

2018-01-31 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20448#discussion_r165132718
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,36 +17,84 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
AttributeSet, Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema}
 import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.types.StructType
 
+/**
+ * A logical plan representing a data source relation, which will be 
planned to a data scan
+ * operator finally.
+ *
+ * @param output The output of this relation.
+ * @param source The instance of a data source v2 implementation.
+ * @param options The options specified for this scan, used to create the 
`DataSourceReader`.
+ * @param userSpecifiedSchema The user specified schema, used to create 
the `DataSourceReader`.
+ * @param filters The predicates which are pushed and handled by this data 
source.
+ * @param existingReader A mutable reader carrying some temporary stats 
during optimization and
+ *   planning. It's always None before optimization, 
and does not take part in
+ *   the equality of this plan, which means this plan 
is still immutable.
+ */
 case class DataSourceV2Relation(
-fullOutput: Seq[AttributeReference],
-reader: DataSourceReader) extends LeafNode with DataSourceReaderHolder 
{
+output: Seq[AttributeReference],
+source: DataSourceV2,
+options: DataSourceOptions,
+userSpecifiedSchema: Option[StructType],
+filters: Set[Expression],
+existingReader: Option[DataSourceReader]) extends LeafNode with 
DataSourceV2QueryPlan {
+
+  override def references: AttributeSet = AttributeSet.empty
+
+  override def sourceClass: Class[_ <: DataSourceV2] = source.getClass
 
   override def canEqual(other: Any): Boolean = 
other.isInstanceOf[DataSourceV2Relation]
 
+  def reader: DataSourceReader = existingReader.getOrElse {
+(source, userSpecifiedSchema) match {
+  case (ds: ReadSupportWithSchema, Some(schema)) =>
+ds.createReader(schema, options)
+
+  case (ds: ReadSupport, None) =>
+ds.createReader(options)
+
+  case (ds: ReadSupport, Some(schema)) =>
+val reader = ds.createReader(options)
+// Sanity check, this should be guaranteed by 
`DataFrameReader.load`
+assert(reader.readSchema() == schema)
+reader
+
+  case _ => throw new IllegalStateException()
+}
+  }
+
   override def computeStats(): Statistics = reader match {
 case r: SupportsReportStatistics =>
   Statistics(sizeInBytes = 
r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
 case _ =>
   Statistics(sizeInBytes = conf.defaultSizeInBytes)
   }
+
+  override def simpleString: String = s"Relation $metadataString"
 }
 
 /**
  * A specialization of DataSourceV2Relation with the streaming bit set to 
true. Otherwise identical
  * to the non-streaming relation.
  */
-class StreamingDataSourceV2Relation(
-fullOutput: Seq[AttributeReference],
-reader: DataSourceReader) extends DataSourceV2Relation(fullOutput, 
reader) {
+case class StreamingDataSourceV2Relation(
--- End diff --

Agreed. That was the plan here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

2018-01-31 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20386#discussion_r165131292
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
 ---
@@ -32,40 +32,44 @@
 @InterfaceStability.Evolving
 public interface StreamWriter extends DataSourceWriter {
   /**
-   * Commits this writing job for the specified epoch with a list of 
commit messages. The commit
-   * messages are collected from successful data writers and are produced 
by
-   * {@link DataWriter#commit()}.
+   * Commits this writing job for the specified epoch.
*
-   * If this method fails (by throwing an exception), this writing job is 
considered to have been
-   * failed, and the execution engine will attempt to call {@link 
#abort(WriterCommitMessage[])}.
+   * When this method is called, the number of commit messages added by
+   * {@link #add(WriterCommitMessage)} equals to the number of input data 
partitions.
+   *
+   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
+   * failed, and {@link #abort()} would be called. The state of the 
destination
+   * is undefined and @{@link #abort()} may not be able to deal with it.
*
* To support exactly-once processing, writer implementations should 
ensure that this method is
* idempotent. The execution engine may call commit() multiple times for 
the same epoch
--- End diff --

For a commit interface, I expect the guarantee to be that data is committed 
exactly once. If commits are idempotent, data may be reprocessed, and commits 
may happen more than once, then that is not an exactly-once commit: that is an 
at-least-once commit.

I'm not trying to split hairs. My point is that if there's no difference in 
behavior between exactly-once and at-least-once because the commit must be 
idempotent, then you don't actually have a exactly-once guarantee.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

2018-01-31 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20386#discussion_r165121965
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
 ---
@@ -32,40 +32,44 @@
 @InterfaceStability.Evolving
 public interface StreamWriter extends DataSourceWriter {
   /**
-   * Commits this writing job for the specified epoch with a list of 
commit messages. The commit
-   * messages are collected from successful data writers and are produced 
by
-   * {@link DataWriter#commit()}.
+   * Commits this writing job for the specified epoch.
*
-   * If this method fails (by throwing an exception), this writing job is 
considered to have been
-   * failed, and the execution engine will attempt to call {@link 
#abort(WriterCommitMessage[])}.
+   * When this method is called, the number of commit messages added by
+   * {@link #add(WriterCommitMessage)} equals to the number of input data 
partitions.
+   *
+   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
+   * failed, and {@link #abort()} would be called. The state of the 
destination
+   * is undefined and @{@link #abort()} may not be able to deal with it.
*
* To support exactly-once processing, writer implementations should 
ensure that this method is
* idempotent. The execution engine may call commit() multiple times for 
the same epoch
--- End diff --

Thanks for this explanation, I think I see what you're saying. But I think 
your statement that refers to "true" exactly-once gives away the fact that this 
does not provide exactly-once semantics.

Maybe this is a question for the dev list: why the weaker version? 
Shouldn't this API provide a check to see whether the data was already 
committed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

2018-01-31 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20386#discussion_r165119560
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
 ---
@@ -63,32 +68,42 @@
   DataWriterFactory createWriterFactory();
 
   /**
-   * Commits this writing job with a list of commit messages. The commit 
messages are collected from
-   * successful data writers and are produced by {@link 
DataWriter#commit()}.
+   * Handles a commit message which is collected from a successful data 
writer.
+   *
+   * Note that, implementations might need to cache all commit messages 
before calling
+   * {@link #commit()} or {@link #abort()}.
*
* If this method fails (by throwing an exception), this writing job is 
considered to to have been
-   * failed, and {@link #abort(WriterCommitMessage[])} would be called. 
The state of the destination
-   * is undefined and @{@link #abort(WriterCommitMessage[])} may not be 
able to deal with it.
+   * failed, and {@link #abort()} would be called. The state of the 
destination
+   * is undefined and @{@link #abort()} may not be able to deal with it.
+   */
+  void add(WriterCommitMessage message);
--- End diff --

+1 for separating and using another PR. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

2018-01-31 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20386#discussion_r165119427
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
 ---
@@ -32,40 +32,44 @@
 @InterfaceStability.Evolving
 public interface StreamWriter extends DataSourceWriter {
   /**
-   * Commits this writing job for the specified epoch with a list of 
commit messages. The commit
-   * messages are collected from successful data writers and are produced 
by
-   * {@link DataWriter#commit()}.
+   * Commits this writing job for the specified epoch.
*
-   * If this method fails (by throwing an exception), this writing job is 
considered to have been
-   * failed, and the execution engine will attempt to call {@link 
#abort(WriterCommitMessage[])}.
+   * When this method is called, the number of commit messages added by
+   * {@link #add(WriterCommitMessage)} equals to the number of input data 
partitions.
--- End diff --

Passing the messages to commit and abort seems simpler and better to me, 
but that's for the batch side. And, we shouldn't move forward with this unless 
there's a use case.

As for the docs here, what is an implementer intended to understand as a 
result of this? "The number of data partitions to write" is also misleading: 
weren't these already written and committed by tasks?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

2018-01-31 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20386#discussion_r165117779
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
 ---
@@ -32,40 +32,44 @@
 @InterfaceStability.Evolving
 public interface StreamWriter extends DataSourceWriter {
   /**
-   * Commits this writing job for the specified epoch with a list of 
commit messages. The commit
-   * messages are collected from successful data writers and are produced 
by
-   * {@link DataWriter#commit()}.
+   * Commits this writing job for the specified epoch.
*
-   * If this method fails (by throwing an exception), this writing job is 
considered to have been
-   * failed, and the execution engine will attempt to call {@link 
#abort(WriterCommitMessage[])}.
+   * When this method is called, the number of commit messages added by
+   * {@link #add(WriterCommitMessage)} equals to the number of input data 
partitions.
+   *
+   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
+   * failed, and {@link #abort()} would be called. The state of the 
destination
+   * is undefined and @{@link #abort()} may not be able to deal with it.
*
* To support exactly-once processing, writer implementations should 
ensure that this method is
* idempotent. The execution engine may call commit() multiple times for 
the same epoch
* in some circumstances.
*/
-  void commit(long epochId, WriterCommitMessage[] messages);
+  void commit(long epochId);
 
   /**
-   * Aborts this writing job because some data writers are failed and keep 
failing when retry, or
-   * the Spark job fails with some unknown reasons, or {@link 
#commit(WriterCommitMessage[])} fails.
+   * Aborts this writing job because some data writers are failed and keep 
failing when retry,
+   * or the Spark job fails with some unknown reasons,
+   * or {@link #commit()} / {@link #add(WriterCommitMessage)} fails
*
* If this method fails (by throwing an exception), the underlying data 
source may require manual
* cleanup.
*
-   * Unless the abort is triggered by the failure of commit, the given 
messages should have some
-   * null slots as there maybe only a few data writers that are committed 
before the abort
-   * happens, or some data writers were committed but their commit 
messages haven't reached the
-   * driver when the abort is triggered. So this is just a "best effort" 
for data sources to
-   * clean up the data left by data writers.
+   * Unless the abort is triggered by the failure of commit, the number of 
commit
+   * messages added by {@link #add(WriterCommitMessage)} should be smaller 
than the number
+   * of input data partitions, as there may be only a few data writers 
that are committed
+   * before the abort happens, or some data writers were committed but 
their commit messages
+   * haven't reached the driver when the abort is triggered. So this is 
just a "best effort"
--- End diff --

Best effort is not just how we describe the behavior, it is a requirement 
of the contract. Spark should not drop commit messages because it is 
convenient. Spark knows what tasks succeeded and failed and which ones were 
authorized to commit. That's enough information to provide the best-effort 
guarantee.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

2018-01-31 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20386
  
> I assume this API is necessary . . . it sounds reasonable to provide a 
callback for task commit.

I agree it sounds reasonable, but we shouldn't add methods to a new API 
blindly and without a use case. The point of a new API, at least in part, is to 
improve on the old one. If it is never used, then we are carrying support for 
something that is useless. On the other hand, if it is used we should know what 
it is needed for so we can design for the use case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

2018-01-30 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20386
  
@gengliangwang, what is the use case supported by this? In other words, how 
is `onTaskCommit(taskCommit: TaskCommitMessage)` currently used that requires 
this change?

In general, I'm more concerned with the batch side and I don't have a huge 
problem with this change. I do want to make sure it is in support of a valid 
use case. I'd also rather separate the batch and streaming committer APIs 
because they have so little in common.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

2018-01-30 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20386#discussion_r164909970
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
 ---
@@ -63,32 +68,42 @@
   DataWriterFactory createWriterFactory();
 
   /**
-   * Commits this writing job with a list of commit messages. The commit 
messages are collected from
-   * successful data writers and are produced by {@link 
DataWriter#commit()}.
+   * Handles a commit message which is collected from a successful data 
writer.
+   *
+   * Note that, implementations might need to cache all commit messages 
before calling
+   * {@link #commit()} or {@link #abort()}.
*
* If this method fails (by throwing an exception), this writing job is 
considered to to have been
-   * failed, and {@link #abort(WriterCommitMessage[])} would be called. 
The state of the destination
-   * is undefined and @{@link #abort(WriterCommitMessage[])} may not be 
able to deal with it.
+   * failed, and {@link #abort()} would be called. The state of the 
destination
+   * is undefined and @{@link #abort()} may not be able to deal with it.
+   */
+  void add(WriterCommitMessage message);
--- End diff --

This is the only method shared between the stream and batch writers. Why 
does the streaming interface extend this one?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

2018-01-30 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20386#discussion_r164909653
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
 ---
@@ -32,40 +32,44 @@
 @InterfaceStability.Evolving
 public interface StreamWriter extends DataSourceWriter {
   /**
-   * Commits this writing job for the specified epoch with a list of 
commit messages. The commit
-   * messages are collected from successful data writers and are produced 
by
-   * {@link DataWriter#commit()}.
+   * Commits this writing job for the specified epoch.
*
-   * If this method fails (by throwing an exception), this writing job is 
considered to have been
-   * failed, and the execution engine will attempt to call {@link 
#abort(WriterCommitMessage[])}.
+   * When this method is called, the number of commit messages added by
+   * {@link #add(WriterCommitMessage)} equals to the number of input data 
partitions.
+   *
+   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
+   * failed, and {@link #abort()} would be called. The state of the 
destination
+   * is undefined and @{@link #abort()} may not be able to deal with it.
*
* To support exactly-once processing, writer implementations should 
ensure that this method is
* idempotent. The execution engine may call commit() multiple times for 
the same epoch
--- End diff --

I realize this isn't part of this commit, but why would an exactly-once 
guarantee require idempotent commits? Processing the same data twice with an 
idempotent guarantee is not the same thing as exactly-once.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

2018-01-30 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20386#discussion_r164909225
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
 ---
@@ -32,40 +32,44 @@
 @InterfaceStability.Evolving
 public interface StreamWriter extends DataSourceWriter {
   /**
-   * Commits this writing job for the specified epoch with a list of 
commit messages. The commit
-   * messages are collected from successful data writers and are produced 
by
-   * {@link DataWriter#commit()}.
+   * Commits this writing job for the specified epoch.
*
-   * If this method fails (by throwing an exception), this writing job is 
considered to have been
-   * failed, and the execution engine will attempt to call {@link 
#abort(WriterCommitMessage[])}.
+   * When this method is called, the number of commit messages added by
+   * {@link #add(WriterCommitMessage)} equals to the number of input data 
partitions.
+   *
+   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
+   * failed, and {@link #abort()} would be called. The state of the 
destination
+   * is undefined and @{@link #abort()} may not be able to deal with it.
*
* To support exactly-once processing, writer implementations should 
ensure that this method is
* idempotent. The execution engine may call commit() multiple times for 
the same epoch
* in some circumstances.
*/
-  void commit(long epochId, WriterCommitMessage[] messages);
+  void commit(long epochId);
 
   /**
-   * Aborts this writing job because some data writers are failed and keep 
failing when retry, or
-   * the Spark job fails with some unknown reasons, or {@link 
#commit(WriterCommitMessage[])} fails.
+   * Aborts this writing job because some data writers are failed and keep 
failing when retry,
+   * or the Spark job fails with some unknown reasons,
+   * or {@link #commit()} / {@link #add(WriterCommitMessage)} fails
*
* If this method fails (by throwing an exception), the underlying data 
source may require manual
* cleanup.
*
-   * Unless the abort is triggered by the failure of commit, the given 
messages should have some
-   * null slots as there maybe only a few data writers that are committed 
before the abort
-   * happens, or some data writers were committed but their commit 
messages haven't reached the
-   * driver when the abort is triggered. So this is just a "best effort" 
for data sources to
-   * clean up the data left by data writers.
+   * Unless the abort is triggered by the failure of commit, the number of 
commit
+   * messages added by {@link #add(WriterCommitMessage)} should be smaller 
than the number
+   * of input data partitions, as there may be only a few data writers 
that are committed
+   * before the abort happens, or some data writers were committed but 
their commit messages
+   * haven't reached the driver when the abort is triggered. So this is 
just a "best effort"
--- End diff --

Commit messages in flight should be handled and aborted. Otherwise, this 
isn't a "best effort". Best effort means that Spark does everything that is 
feasible to ensure that commit messages are added before aborting, and that 
should include race conditions from RPC.

The case where "best effort" might miss a message is if the message is 
created, but a node fails before it is sent to the driver.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

2018-01-30 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20386#discussion_r164908529
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
 ---
@@ -63,32 +68,42 @@
   DataWriterFactory createWriterFactory();
 
   /**
-   * Commits this writing job with a list of commit messages. The commit 
messages are collected from
-   * successful data writers and are produced by {@link 
DataWriter#commit()}.
+   * Handles a commit message which is collected from a successful data 
writer.
+   *
+   * Note that, implementations might need to cache all commit messages 
before calling
+   * {@link #commit()} or {@link #abort()}.
--- End diff --

In what case would an implementation not cache and commit all at once? What 
is the point of a commit if not to make sure all of the data shows up at the 
same time?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

2018-01-30 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20386#discussion_r164907626
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
 ---
@@ -32,40 +32,44 @@
 @InterfaceStability.Evolving
 public interface StreamWriter extends DataSourceWriter {
   /**
-   * Commits this writing job for the specified epoch with a list of 
commit messages. The commit
-   * messages are collected from successful data writers and are produced 
by
-   * {@link DataWriter#commit()}.
+   * Commits this writing job for the specified epoch.
*
-   * If this method fails (by throwing an exception), this writing job is 
considered to have been
-   * failed, and the execution engine will attempt to call {@link 
#abort(WriterCommitMessage[])}.
+   * When this method is called, the number of commit messages added by
+   * {@link #add(WriterCommitMessage)} equals to the number of input data 
partitions.
--- End diff --

What does this mean? It isn't clear to me what "the number of input 
partitions" means, or why it isn't obvious that it is equal to the number of 
pending `WriterCommitMessage` instances passed to add.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

2018-01-30 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20386#discussion_r164907397
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
 ---
@@ -32,40 +32,44 @@
 @InterfaceStability.Evolving
 public interface StreamWriter extends DataSourceWriter {
   /**
-   * Commits this writing job for the specified epoch with a list of 
commit messages. The commit
-   * messages are collected from successful data writers and are produced 
by
-   * {@link DataWriter#commit()}.
+   * Commits this writing job for the specified epoch.
*
-   * If this method fails (by throwing an exception), this writing job is 
considered to have been
-   * failed, and the execution engine will attempt to call {@link 
#abort(WriterCommitMessage[])}.
+   * When this method is called, the number of commit messages added by
+   * {@link #add(WriterCommitMessage)} equals to the number of input data 
partitions.
+   *
+   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
+   * failed, and {@link #abort()} would be called. The state of the 
destination
+   * is undefined and @{@link #abort()} may not be able to deal with it.
--- End diff --

Nit: javadoc typo.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

2018-01-30 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20386
  
@cloud-fan, is the intent to get this into 2.3.0? If so, I'll make time to 
review it today.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20427: [SPARK-23260][SPARK-23262][SQL] several data sour...

2018-01-30 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20427#discussion_r164807449
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java
 ---
@@ -25,7 +25,7 @@
  * session.
  */
 @InterfaceStability.Evolving
-public interface SessionConfigSupport {
+public interface SessionConfigSupport extends DataSourceV2 {
--- End diff --

Ping me on the new PR. I'm happy to review it (though it is non-binding).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20427: [SPARK-23260][SPARK-23262][SQL] several data sour...

2018-01-30 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20427#discussion_r164806676
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java
 ---
@@ -25,7 +25,7 @@
  * session.
  */
 @InterfaceStability.Evolving
-public interface SessionConfigSupport {
+public interface SessionConfigSupport extends DataSourceV2 {
--- End diff --

Mixing large migration commits like this one with unrelated changes makes 
it harder to pick or revert changes without unintended side-effects. What 
happens if we realize that this rename was a bad idea? Reverting this commit 
would also revert the constraint that SessionConfigSupport extends 
DataSourceV2. Similarly, if we realize that these mix-ins don't need to extend 
DataSourceV2, then we would have to find and remove them all instead of 
reverting a commit. That might even sound okay, but when you're picking commits 
deliberately to patch branches, you need to make as few changes as possible and 
cherry-pick conflicts make that much harder.

The fact that you're rushing to get commits into 2.3 is even more 
concerning and reason to be careful, not a reason to relax our standards. 
Please move this to its own PR and fix all of the interfaces at once.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20427: [SPARK-23260][SQL] remove V2 from the class name ...

2018-01-29 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20427#discussion_r164621094
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java
 ---
@@ -25,7 +25,7 @@
  * session.
  */
 @InterfaceStability.Evolving
-public interface SessionConfigSupport {
+public interface SessionConfigSupport extends DataSourceV2 {
--- End diff --

It's best to keep commits clean and focused. I'd say create a new JIRA for 
it and do all of the mix-ins at once.

+1 when this is separated to its own PR. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Use immut...

2018-01-29 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20387
  
> It's hard to improve PhysicalOperation to support more operators and 
specific push down orders, so I created the new one

I'm concerned about the new one. The projection support seems really 
brittle because it calls out specific logical nodes and scans the entire plan. 
If we are doing push-down wrong on the current v1 and Hive code paths, then I'd 
like to see a proposal for fixing that without these drawbacks.

I like that this PR pushes projections and filters just like the other 
paths. We should start there and add additional push-down as necessary.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Use immut...

2018-01-29 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20387
  
> [The push-down rule may be run more than once if filters are not pushed 
through projections] looks weird, do you have a query to reproduce this issue?

One of the DataSourceV2 tests hit this. I thought it was a good thing to 
push a single node down at a time and not depend on order.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Use immut...

2018-01-29 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20387
  
> I'd suggest that we just propogate the paths parameter to options, and 
data source implementations are free to interprete the path option to whatever 
they want, e.g. table and database names.

What about code paths that expect table names? In our branch, we've added 
support for converting Hive relations (which have a `TableIdentifier`, not a 
path) and using `insertInto`. Table names are paths are the two main ways to 
identify tables and I think both should be supported.

This is a new API, so it doesn't matter that `load` and `save` currently 
use paths. We can easily update that support for tables. If we don't, then 
there will be no common way to refer to tables: some implementations will use 
`table`, some will pass `db` separately, and some might use `database`. 
Standardizing this and adding support in Spark will produce more consistent 
behavior across data sources.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Use immut...

2018-01-29 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20387
  
> I'm ok to make it immutable if there is an significant benefit.

Mutable nodes violate a basic assumption of catalyst, that trees are 
immutable. Here's a good quote from the SIGMOD paper (by @rxin, @yhuai, and 
@marmbrus et al.):

> In our experience, functional transformations on immutable trees make the 
whole optimizer very easy to reason about and debug. They also enable 
parallelization in the optimizer, although we do not yet exploit this.

Mixing mutable nodes into supposedly immutable trees is a bad idea. Other 
nodes in the tree assume that children do not change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20427: [SPARK-23260][SQL] remove V2 from the class name ...

2018-01-29 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20427#discussion_r164544288
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java
 ---
@@ -25,7 +25,7 @@
  * session.
  */
 @InterfaceStability.Evolving
-public interface SessionConfigSupport {
+public interface SessionConfigSupport extends DataSourceV2 {
--- End diff --

Why does this need to extend DataSourceV2? Why add this in a commit that 
appears to be nothing more than a rename?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20427: [SPARK-23260][SQL] remove V2 from the class name ...

2018-01-29 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20427#discussion_r164543216
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java ---
@@ -18,23 +18,23 @@
 package org.apache.spark.sql.sources.v2;
 
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 
 /**
  * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
  * provide data reading ability and scan the data from the data source.
  */
 @InterfaceStability.Evolving
-public interface ReadSupport {
+public interface ReadSupport extends DataSourceV2 {
 
   /**
-   * Creates a {@link DataSourceV2Reader} to scan the data from this data 
source.
+   * Creates a {@link DataSourceReader} to scan the data from this data 
source.
*
* If this method fails (by throwing an exception), the action would 
fail and no Spark job was
* submitted.
*
* @param options the options for the returned data source reader, which 
is an immutable
*case-insensitive string-to-string map.
*/
-  DataSourceV2Reader createReader(DataSourceV2Options options);
+  DataSourceReader createReader(DataSourceV2Options options);
--- End diff --

Why not rename options as well?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFactory i...

2018-01-29 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20397
  
This is more confusing, not less. Look at @jiangxb1987's comment above: "We 
shall create only one DataReaderFactory, and have that create multiple data 
readers." It is not clear why the API requires a list of factories, instead of 
using just one. If this is renamed to factory, is it a requirement that the 
factory can create more than one data reader for the same task?

To the point about serializing and sending to executors, "factory" doesn't 
imply that any more than "task" does. The fact that these are serialized needs 
to be clear in documentation.

The read and write side behave differently. They do not need to mirror one 
another's naming when that makes names less precise. This isn't forcing users 
to look at a subtle difference. It is just breaking the (wrong) assumption that 
both read and write sides have the same behavior.

@rxin, any opinion here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFactory i...

2018-01-29 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20397
  
> I think the renaming is worth to remove future confusions.

What future confusion?

I understand that the difference isn't obvious, but making the names less 
accurate isn't a good fix. The read and write sides don't have to mirror one 
another. They behave differently and that's okay. Names should be based on what 
the classes actually do.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFactory i...

2018-01-29 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20397
  
One last point: should significant changes to public APIs like this go in 
just before or just after a release? 2.3.0 candidates have used ReadTask up to 
now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFactory i...

2018-01-29 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20397
  
@cloud-fan, thanks for pinging me on this.

-1: I don't think there's a compelling benefit to justify this change, and 
I think it makes the API more confusing. I think we should revert this.

This class doesn't actually behave as a factory and is used more like an 
Iterable: it is only used to instantiate one DataReader and carries no explicit 
guarantee that it can be reused. In addition, the piece of work that each one 
represents is a task, which becomes an actual task when the stage runs. I would 
much rather keep the ReadTask name to make that connection clear.

The write side does behave like a factory, so the name is appropriate 
there. There is little value to uniform names if the names actually make the 
API more confusing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Us...

2018-01-26 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r164169060
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,15 +17,149 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
 import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-fullOutput: Seq[AttributeReference],
-reader: DataSourceV2Reader) extends LeafNode with 
DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+path: Option[String] = None,
+table: Option[TableIdentifier] = None,
--- End diff --

That's why these are options. Passing either path or table name is the most 
common case, which we should have good support for. If tables are identified in 
other ways, that's supported.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Us...

2018-01-25 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r163913161
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,15 +17,149 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
 import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-fullOutput: Seq[AttributeReference],
-reader: DataSourceV2Reader) extends LeafNode with 
DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+path: Option[String] = None,
+table: Option[TableIdentifier] = None,
--- End diff --

I guess another way to say this is that it's better to set reliable path, 
database, and table parameters after passing the explicitly, than to require 
all the places where DataSourceV2Relations are created do the same thing. 
Better to standardize passing these options in `v2Options`, and it would be 
even better to pass these directly to the readers and writers.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Us...

2018-01-25 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r163909751
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,15 +17,149 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
 import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-fullOutput: Seq[AttributeReference],
-reader: DataSourceV2Reader) extends LeafNode with 
DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+path: Option[String] = None,
+table: Option[TableIdentifier] = None,
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
--- End diff --

I'm not sure I understand what you mean. When something is pushed, it 
creates a new immutable relation, so I think it has to be added to the 
relation. But I'm not sure that many things will be pushed besides the 
projection and filters. What are you thinking that we would need to add? 
Fragments of logical plan?

Assuming we add the ability to push parts of the logical plan, then this 
would need to have a reference to the part that was pushed down. I'm not sure 
that would be this relation class, a subclass, or something else, but I would 
be fine adding a third push-down option here. The number of things to push down 
isn't very large, is it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Us...

2018-01-25 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r163908263
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,15 +17,149 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
 import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-fullOutput: Seq[AttributeReference],
-reader: DataSourceV2Reader) extends LeafNode with 
DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+path: Option[String] = None,
+table: Option[TableIdentifier] = None,
--- End diff --

We could keep these in options, but because they are the main two ways to 
identify tables, they should be easier to work with. I'd even suggest adding 
them to the DataSourceV2 read and write APIs.

Another benefit of adding these is that it is easier to use 
DataSourceV2Relation elsewhere. In our Spark build, I've added a rule to 
convert Hive relations to DataSourceV2Relation based on a table property. 
That's cleaner because we can pass the TableIdentifier instead of adding 
options to the map.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Use immut...

2018-01-24 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20387
  
@cloud-fan, please have a look at these changes. This will require 
follow-up for the Streaming side. I have yet to review the streaming interfaces 
for `DataSourceV2`, so I haven't made any changes there.

In our Spark build, I've also moved the write path to  use 
DataSourceV2Relation, which I intend to do in a follow-up to this issue.

@rxin FYI.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: SPARK-22386: DataSourceV2: Use immutable logical ...

2018-01-24 Thread rdblue
GitHub user rdblue opened a pull request:

https://github.com/apache/spark/pull/20387

SPARK-22386: DataSourceV2: Use immutable logical plans.

## What changes were proposed in this pull request?

DataSourceV2 should use immutable catalyst trees instead of wrapping a 
mutable DataSourceV2Reader. This commit updates DataSourceV2Relation and 
consolidates much of the DataSourceV2 API requirements for the read path in it. 
Instead of wrapping a reader that changes, the relation lazily produces a 
reader from its configuration.

This commit also updates the predicate and projection push-down. Instead of 
the implementation from SPARK-22197, this reuses the rule matching from the 
Hive and DataSource read paths (using `PhysicalOperation`) and copies most of 
the implementation of `SparkPlanner.pruneFilterProject`, with updates for 
DataSourceV2. By reusing the implementation from other read paths, this should 
have fewer regressions from other read paths and is less code to maintain.

The new push-down rules also support the following edge cases:

* The output of DataSourceV2Relation should be what is returned by the 
reader, in case the reader can only partially satisfy the requested schema 
projection
* The requested projection passed to the DataSourceV2Reader should include 
filter columns
* The push-down rule may be run more than once if filters are not pushed 
through projections

## How was this patch tested?

Existing push-down and read tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/spark 
SPARK-22386-push-down-immutable-trees

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20387.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20387


commit d3233e1a8b1d4d153146b1a536dee34246920b0d
Author: Ryan Blue <blue@...>
Date:   2018-01-17T21:58:12Z

SPAKR-22386: DataSourceV2: Use immutable logical plans.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19861: [SPARK-22387][SQL] Propagate session configs to data sou...

2018-01-22 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/19861
  
Thanks for the example, @cloud-fan.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19861: [SPARK-22387][SQL] Propagate session configs to data sou...

2018-01-22 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/19861
  
@jiangxb1987, I understand what this does. I just wanted an example use 
case where it was necessary. What was the motivating *use case*?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19861: [SPARK-22387][SQL] Propagate session configs to data sou...

2018-01-22 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/19861
  
@jiangxb1987, @cloud-fan, what was the use case you needed to add this for?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20201: [SPARK-22389][SQL] data source v2 partitioning reporting...

2018-01-22 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20201
  
@cloud-fan, please ping me to review PRs for DataSourceV2. Our new table 
format uses it and we're preparing some changes, so I want to make sure we're 
heading in the same direction for this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19568: SPARK-22345: Fix sort-merge joins with conditions...

2017-11-28 Thread rdblue
Github user rdblue closed the pull request at:

https://github.com/apache/spark/pull/19568


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19568: SPARK-22345: Fix sort-merge joins with conditions...

2017-11-28 Thread rdblue
GitHub user rdblue reopened a pull request:

https://github.com/apache/spark/pull/19568

SPARK-22345: Fix sort-merge joins with conditions and codegen.

## What changes were proposed in this pull request?

This adds a joined row to sort-merge join codegen. That joined row is used 
to generate code for filter expressions, which may fall back to using the 
result row. Previously, the right side of the join was used, which is incorrect 
(the non-codegen implementations use a joined row).

## How was this patch tested?

Current tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/spark 
SPARK-22345-fix-sort-merge-codegen

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19568.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19568






---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19568: SPARK-22345: Fix sort-merge joins with conditions and co...

2017-11-28 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/19568
  
@gatorsmile, I think it would be better to fix codegen than to prevent it 
from happening with an assertion. If `CodegenFallback` can produce fallback 
code, why not allow it to when necessary?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] incubator-toree pull request #147: Support for --spark-context-initializatio...

2017-11-20 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/incubator-toree/pull/147#discussion_r152119598
  
--- Diff: 
kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala ---
@@ -193,6 +199,19 @@ class CommandLineOptions(args: Seq[String]) {
 Some(userDefined.asJava)
   }
 
+  private def onlyPositiveOtherwiseDefault(spec: OptionSpec[Int]): 
Option[Int] = {
--- End diff --

I don't think it is a good idea to silently ignore user input. If the value 
passed is -1, for example, then the user gets 100ms. If what the user requests 
isn't possible, then this should fail and print the help message.


---


[GitHub] spark pull request #19623: [SPARK-22078][SQL] clarify exception behaviors fo...

2017-11-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19623#discussion_r148875607
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
 ---
@@ -50,28 +53,34 @@
 
   /**
* Creates a writer factory which will be serialized and sent to 
executors.
+   *
+   * If this method fails (by throwing an exception), the action would 
fail and no Spark job was
+   * submitted.
*/
   DataWriterFactory createWriterFactory();
 
   /**
* Commits this writing job with a list of commit messages. The commit 
messages are collected from
-   * successful data writers and are produced by {@link 
DataWriter#commit()}. If this method
-   * fails(throw exception), this writing job is considered to be failed, 
and
-   * {@link #abort(WriterCommitMessage[])} will be called. The written 
data should only be visible
-   * to data source readers if this method succeeds.
+   * successful data writers and are produced by {@link 
DataWriter#commit()}.
+   *
+   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
+   * failed, and {@link #abort(WriterCommitMessage[])} would be called. 
The state of the destination
+   * is undefined and @{@link #abort(WriterCommitMessage[])} may not be 
able to deal with it.
*
* Note that, one partition may have multiple committed data writers 
because of speculative tasks.
* Spark will pick the first successful one and get its commit message. 
Implementations should be
--- End diff --

I haven't read Steve's points here entirely, but I agree that Spark should 
be primarily responsible for task commit coordination. Most implementations 
would be fine using the current [output commit 
coordinator](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala),
 which does a good job balancing the trade-offs that you've been discussing. It 
ensures that only one task is authorized to commit and has well-defined failure 
cases (when a network partition prevents the authorized committer from 
responding before its commit authorization times out).

I think that Spark should use the current commit coordinator unless an 
implementation opts out of using it (and I'm not sure that opting out is a use 
case we care to support at this point). It's fine if Spark documents how its 
coordinator works and there are some drawbacks, but expecting implementations 
to handle their own commit coordination (which requires RPC for Spark) is, I 
think, unreasonable. Let's use the one we have by default, however imperfect.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19623: [SPARK-22078][SQL] clarify exception behaviors fo...

2017-11-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19623#discussion_r148849054
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java ---
@@ -36,14 +36,24 @@
   /**
* The preferred locations where this read task can run faster, but 
Spark does not guarantee that
* this task will always run on these locations. The implementations 
should make sure that it can
-   * be run on any location. The location is a string representing the 
host name of an executor.
+   * be run on any location. The location is a string representing the 
host name.
+   *
+   * Note that if a host name cannot be recognized by Spark, it will be 
ignored as it was not in
+   * the returned locations. By default this method returns empty string, 
which means this task
--- End diff --

This isn't the empty string, it is a 0-length array.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19623: [SPARK-22078][SQL] clarify exception behaviors fo...

2017-11-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19623#discussion_r148848790
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java 
---
@@ -34,11 +35,17 @@
 
   /**
* Proceed to next record, returns false if there is no more records.
+   *
+   * If this method fails (by throwing an exception), the corresponding 
Spark task would fail and
+   * get retried until hitting the maximum retry times.
*/
-  boolean next();
+  boolean next() throws IOException;
--- End diff --

Should clarify when it is okay to throw IOException with `@throws`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19623: [SPARK-22078][SQL] clarify exception behaviors fo...

2017-11-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19623#discussion_r148848619
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java ---
@@ -30,6 +30,9 @@
   /**
* Creates a {@link DataSourceV2Reader} to scan the data from this data 
source.
*
+   * If this method fails (by throwing an exception), the action would 
fail and no Spark job was
--- End diff --

Are there recommended exceptions to throw? It would be great if this 
specified what exception classes implementations should use for problems with 
`@throws` javadoc entries. For example, `@throws AnalysisException If ...`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19568: SPARK-22345: Fix sort-merge joins with conditions and co...

2017-10-30 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/19568
  
@DonnyZone, I don't know of any cases that use codgen after the fix for 
`CodegenFallback`, but I think this is still a good idea.

If Spark is going to generate code, it should generate correct code. That 
means either we remove the codegen implementation from `CodegenFallback`, or we 
fix the row passed in by sort-merge join. I'd rather fix sort-merge join 
because we may want to change the behavior to codegen as much of the condition 
as possible later on.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13206: [SPARK-15420] [SQL] Add repartition and sort to prepare ...

2017-10-27 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/13206
  
We still maintain a version of this for our Spark builds to avoid an extra 
sort in Hive. If someone is willing to review it, I can probably find the time 
to rebase it on master. I think the year this sat initially was just because 
the 2.0 release was happening at the same time and there wasn't much bandwidth 
for reviews.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19568: SPARK-22345: Fix sort-merge joins with conditions...

2017-10-27 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19568#discussion_r147471530
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
 ---
@@ -124,7 +125,8 @@ class InnerJoinSuite extends SparkPlanTest with 
SharedSQLContext {
 rightPlan: SparkPlan) = {
   val sortMergeJoin = joins.SortMergeJoinExec(leftKeys, rightKeys, 
Inner, boundCondition,
 leftPlan, rightPlan)
-  EnsureRequirements(spark.sessionState.conf).apply(sortMergeJoin)
+  EnsureRequirements(spark.sessionState.conf)
+  .apply(ProjectExec(sortMergeJoin.output, sortMergeJoin))
--- End diff --

In 2.1.1, an extra project causes `WholeStageCodegenExec` to not detect 
that the expression contains `CodegenFallback`. This is no longer the case. 
Like I said, there is no longer a good way to test what happens when 
`CodegenFallback` generates code. If there were, I'd use that here to test the 
case.

I guess I could add a testing case to `WholeStageCodegenExec` to make sure 
the code is generated correctly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19568: SPARK-22345: Fix sort-merge joins with conditions...

2017-10-27 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19568#discussion_r147471054
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
 ---
@@ -228,6 +230,27 @@ class InnerJoinSuite extends SparkPlanTest with 
SharedSQLContext {
 )
   )
 
+  testInnerJoin(
--- End diff --

This test fails in 2.1.1 and versions before 
https://github.com/apache/spark/commit/6b6dd682e84d3b03d0b15fbd81a0d16729e521d2.
 I'm not sure how to exercise the code generated by CodegenFallback with that 
fix, but this test is valid for the 2.1.1 branch.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19568: SPARK-22345: Fix sort-merge joins with conditions...

2017-10-26 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19568#discussion_r14733
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -585,21 +585,26 @@ case class SortMergeJoinExec(
 
 val iterator = ctx.freshName("iterator")
 val numOutput = metricTerm(ctx, "numOutputRows")
+val joinedRow = ctx.freshName("joined")
 val (beforeLoop, condCheck) = if (condition.isDefined) {
   // Split the code of creating variables based on whether it's used 
by condition or not.
   val loaded = ctx.freshName("loaded")
   val (leftBefore, leftAfter) = splitVarsByCondition(left.output, 
leftVars)
   val (rightBefore, rightAfter) = splitVarsByCondition(right.output, 
rightVars)
+
   // Generate code for condition
+  ctx.INPUT_ROW = joinedRow
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19568: SPARK-22345: Fix sort-merge joins with conditions...

2017-10-25 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19568#discussion_r146975643
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -585,21 +585,26 @@ case class SortMergeJoinExec(
 
 val iterator = ctx.freshName("iterator")
 val numOutput = metricTerm(ctx, "numOutputRows")
+val joinedRow = ctx.freshName("joined")
--- End diff --

The second problem was fixed in this commit: 
https://github.com/apache/spark/commit/6b6dd682e84d3b03d0b15fbd81a0d16729e521d2

I still think that the codegen problem should be fixed. Detecting 
`CodgenFallback` is imperfect, but will still generate code and run it. I think 
we should either remove codegen from `CodegenFallback` or add this fix to 
ensure that code works, even if we don't expect to run it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19568: SPARK-22345: Fix sort-merge joins with conditions...

2017-10-25 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19568#discussion_r146974005
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -585,21 +585,26 @@ case class SortMergeJoinExec(
 
 val iterator = ctx.freshName("iterator")
 val numOutput = metricTerm(ctx, "numOutputRows")
+val joinedRow = ctx.freshName("joined")
--- End diff --

It ended up being a bit more complicated. There are two problems. The first 
is what this fixes, which is that the INPUT_ROW in the codegen context points 
to the wrong row. This is fixed and now has a test that fails if you uncomment 
the line that sets INPUT_ROW.

The second problem is in the check for `CodegenFallback` fails to check 
whether the condition supports codegen in some plans. To get the test to fail, 
I had to add a projection to exercise the [path where this 
happens](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L524).
 I'll add a second commit for this problem.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19568: SPARK-22345: Fix sort-merge joins with conditions...

2017-10-25 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19568#discussion_r146970055
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -615,6 +620,7 @@ case class SortMergeJoinExec(
 }
 
 s"""
+   |$joinedRow = new JoinedRow();
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17813: [SPARK-20540][CORE] Fix unstable executor request...

2017-10-25 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/17813#discussion_r146939350
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -589,8 +605,18 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   // take into account executors that are pending to be added or 
removed.
   val adjustTotalExecutors =
 if (!replace) {
-  doRequestTotalExecutors(
-numExistingExecutors + numPendingExecutors - 
executorsPendingToRemove.size)
+  requestedTotalExecutors = math.max(requestedTotalExecutors - 
executorsToKill.size, 0)
+  if (requestedTotalExecutors !=
--- End diff --

This is just informational. The problem is that the state of the allocation 
manager isn't synced with the scheduler. Instead, the allocator sends messages 
to try to control the scheduler backend to get the same state. For example, 
instead of telling the scheduler backend that the desired number of executors 
is 10, the allocator sends a message to add 2 executors. When this gets out of 
sync because of failures or network delay, you end up with these messages.

When you see these, make sure you're just out of sync (and will eventually 
get back in sync), and not in a state where the scheduler and allocator can't 
reconcile the required number of executors. That's what this PR tried to fix.

The long-term solution is to update the communication so that the allocator 
requests its ideal state, always telling the scheduler backend how many 
executors it currently needs, instead of killing or requesting more.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19568: SPARK-22345: Fix sort-merge joins with conditions...

2017-10-25 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19568#discussion_r146928894
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -585,21 +585,26 @@ case class SortMergeJoinExec(
 
 val iterator = ctx.freshName("iterator")
 val numOutput = metricTerm(ctx, "numOutputRows")
+val joinedRow = ctx.freshName("joined")
--- End diff --

The joined row should always be used for correctness. We don't know what 
code the expression will generate, so we should plan on always passing the 
correct input row. Setting left and right on a joined row is a cheap operation, 
so I'd rather do it correctly than rely on something brittle like 
`isInstanceOf[CodegenFallback]`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19568: SPARK-22345: Fix sort-merge joins with conditions...

2017-10-25 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19568#discussion_r146928318
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -615,6 +620,7 @@ case class SortMergeJoinExec(
 }
 
 s"""
+   |$joinedRow = new JoinedRow();
--- End diff --

Yeah, that's causing the test failures. This is a typo from some 
restructuring I did to get this upstream.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19568: SPARK-22345: Fix sort-merge joins with conditions and co...

2017-10-24 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/19568
  
@dongjoon-hyun, yes, I'm currently working on it. I just wanted to get the 
rest up.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19568: SPARK-22345: Fix sort-merge joins with conditions...

2017-10-24 Thread rdblue
GitHub user rdblue opened a pull request:

https://github.com/apache/spark/pull/19568

SPARK-22345: Fix sort-merge joins with conditions and codegen.

## What changes were proposed in this pull request?

This adds a joined row to sort-merge join codegen. That joined row is used 
to generate code for filter expressions, which may fall back to using the 
result row. Previously, the right side of the join was used, which is incorrect 
(the non-codegen implementations use a joined row).

## How was this patch tested?

Current tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/spark 
SPARK-22345-fix-sort-merge-codegen

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19568.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19568


commit 4afb088a4fa2127cab7467cc56f58cd77bd8c251
Author: Ryan Blue <b...@apache.org>
Date:   2017-10-24T20:21:50Z

SPARK-22345: Fix sort-merge joins with conditions and codegen.

Code for the condition was generated to depend on the right row instead
of the joined row.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19448: [SPARK-22217] [SQL] ParquetFileFormat to support arbitra...

2017-10-13 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/19448
  
I have a lot of sympathy for the argument that infrastructure software 
shouldn't have too many backports and that those should be generally bug fixes. 
But, if I were working on a Spark distribution at a vendor, this is something I 
would definitely include because it's such a useful feature. I think that by 
not backporting this, we're just pushing that work downstream. Plus, the risk 
to adding this is low: the main behavior change is that users can specify a 
previously-banned committer for Parquet writes. Is it a bug fix? Probably not. 
But it fixes a big blocker.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...

2017-10-12 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19448#discussion_r144388430
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -138,6 +138,10 @@ class ParquetFileFormat
   conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
 }
 
+require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
--- End diff --

I think once per write operation is fine. It's not like it is once per file.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19448: [SPARK-22217] [SQL] ParquetFileFormat to support arbitra...

2017-10-12 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/19448
  
Still +1 from me as well.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...

2017-10-12 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19448#discussion_r144331909
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -138,6 +138,10 @@ class ParquetFileFormat
   conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
 }
 
+require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
--- End diff --

I think I'd prefer the warn & continue option. It does little good to fail 
so late in a job, when the caller has already indicated that they want to use a 
different committer. Let them write the data out since this isn't a correctness 
issue, and they can add a summary file later if they want. Basically, there's 
less annoyance and interruption by not writing a summary file than by failing a 
job and forcing the user to re-run near the end.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-10-11 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r144097061
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source writer that is returned by
+ * {@link WriteSupport#createWriter(StructType, SaveMode, 
DataSourceV2Options)}.
+ * It can mix in various writing optimization interfaces to speed up the 
data saving. The actual
+ * writing logic is delegated to {@link DataWriter}.
+ *
+ * The writing procedure is:
+ *   1. Create a writer factory by {@link #createWriterFactory()}, 
serialize and send it to all the
+ *  partitions of the input data(RDD).
+ *   2. For each partition, create the data writer, and write the data of 
the partition with this
+ *  writer. If all the data are written successfully, call {@link 
DataWriter#commit()}. If
+ *  exception happens during the writing, call {@link 
DataWriter#abort()}.
+ *   3. If all writers are successfully committed, call {@link 
#commit(WriterCommitMessage[])}. If
+ *  some writers are aborted, or the job failed with an unknown 
reason, call
+ *  {@link #abort(WriterCommitMessage[])}.
+ *
+ * Spark won't retry failed writing jobs, users should do it manually in 
their Spark applications if
+ * they want to retry.
+ *
+ * Please refer to the document of commit/abort methods for detailed 
specifications.
+ *
+ * Note that, this interface provides a protocol between Spark and data 
sources for transactional
+ * data writing, but the transaction here is Spark-level transaction, 
which may not be the
+ * underlying storage transaction. For example, Spark successfully writes 
data to a Cassandra data
+ * source, but Cassandra may need some more time to reach consistency at 
storage level.
+ */
+@InterfaceStability.Evolving
+public interface DataSourceV2Writer {
+
+  /**
+   * Creates a writer factory which will be serialized and sent to 
executors.
+   */
+  DataWriterFactory createWriterFactory();
+
+  /**
+   * Commits this writing job with a list of commit messages. The commit 
messages are collected from
+   * successful data writers and are produced by {@link 
DataWriter#commit()}. If this method
+   * fails(throw exception), this writing job is considered to be failed, 
and
+   * {@link #abort(WriterCommitMessage[])} will be called. The written 
data should only be visible
+   * to data source readers if this method successes.
+   *
+   * Note that, one partition may have multiple committed data writers 
because of speculative tasks.
+   * Spark will pick the first successful one and get its commit message. 
Implementations should be
+   * aware of this and handle it correctly, e.g., have a mechanism to make 
sure only one data writer
+   * can commit successfully, or have a way to clean up the data of 
already-committed writers.
+   */
+  void commit(WriterCommitMessage[] messages);
+
+  /**
+   * Aborts this writing job because some data writers are failed to write 
the records and aborted,
+   * or the Spark job fails with some unknown reasons, or {@link 
#commit(WriterCommitMessage[])}
+   * fails. If this method fails(throw exception), the underlying data 
source may have garbage that
+   * need to be cleaned manually, but these garbage should not be visible 
to data source readers.
+   *
+   * Unless the abortion is triggered by the failure of commit, the given 
messages should have s

<    4   5   6   7   8   9   10   11   12   13   >