[GitHub] spark issue #21948: [SPARK-24991][SQL] use InternalRow in DataSourceWriter

2018-08-04 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21948
  
@cloud-fan, thanks for documenting the behavior and removing the default 
copy. I had a couple of questions, but I think it is close.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21948: [SPARK-24991][SQL] use InternalRow in DataSourceW...

2018-08-04 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21948#discussion_r207722363
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
 ---
@@ -44,16 +46,16 @@ class MemorySinkV2Suite extends StreamTest with 
BeforeAndAfter {
 val writer = new MemoryStreamWriter(sink, OutputMode.Append(), new 
StructType().add("i", "int"))
 writer.commit(0,
   Array(
-MemoryWriterCommitMessage(0, Seq(InternalRow(1), InternalRow(2))),
-MemoryWriterCommitMessage(1, Seq(InternalRow(3), InternalRow(4))),
-MemoryWriterCommitMessage(2, Seq(InternalRow(6), InternalRow(7)))
+MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))),
--- End diff --

Why was this changed back to Row?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21948: [SPARK-24991][SQL] use InternalRow in DataSourceW...

2018-08-04 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21948#discussion_r207722340
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
 ---
@@ -89,7 +89,8 @@ class RateStreamContinuousReader(options: 
DataSourceOptions) extends ContinuousR
 start.runTimeMs,
 i,
 numPartitions,
-perPartitionRate): InputPartition[InternalRow]
+perPartitionRate)
+.asInstanceOf[InputPartition[InternalRow]]
--- End diff --

Why is this cast necessary?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21978: SPARK-25006: Add CatalogTableIdentifier.

2018-08-03 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21978
  
FYI @jzhuge


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.

2018-08-03 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21977
  
@holdenk, I attempted to write a YARN unit test for this, but evidently the 
MiniYARNCluster doesn't run python workers. The task is run, but a worker is 
never started. If you have any idea how to fix this, I think we could have an 
easy test. Here's what I have so far: 
https://gist.github.com/rdblue/9848a00f49eaad6126fbbcfa1b039e19


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21977: SPARK-25004: Add spark.executor.pyspark.memory li...

2018-08-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21977#discussion_r207690569
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
 ---
@@ -81,6 +82,17 @@ case class AggregateInPandasExec(
 
 val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
 val reuseWorker = 
inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
+val memoryMb = {
--- End diff --

I went ahead with the refactor.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21977: SPARK-25004: Add spark.executor.pyspark.memory li...

2018-08-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21977#discussion_r207637946
  
--- Diff: python/pyspark/worker.py ---
@@ -259,6 +260,26 @@ def main(infile, outfile):
  "PYSPARK_DRIVER_PYTHON are correctly set.") %
 ("%d.%d" % sys.version_info[:2], version))
 
+# set up memory limits
+memory_limit_mb = int(os.environ.get('PYSPARK_EXECUTOR_MEMORY_MB', 
"-1"))
+total_memory = resource.RLIMIT_AS
+try:
+(total_memory_limit, max_total_memory) = 
resource.getrlimit(total_memory)
+msg = "Current mem: {0} of max 
{1}\n".format(total_memory_limit, max_total_memory)
+sys.stderr.write()
+
+if memory_limit_mb > 0 and total_memory_limit < 0:
--- End diff --

I've updated to use `resource.RLIM_INFINITY`.

I think this should only set the resource limit if it isn't already set. It 
is unlikely that it's already set because this is during worker initialization, 
but the intent is to not cause harm if a higher-level system (i.e. container 
provider) has already set the limit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21977: SPARK-25004: Add spark.executor.pyspark.memory li...

2018-08-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21977#discussion_r207636504
  
--- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
---
@@ -51,6 +52,17 @@ private[spark] class PythonRDD(
   val bufferSize = conf.getInt("spark.buffer.size", 65536)
   val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true)
 
+  val memoryMb = {
--- End diff --

I thought the comments below were clear: if a single worker is reused, it 
gets the entire allocation. If each core starts its own worker, each one gets 
an equal share.

If `reuseWorker` is actually ignored, then this needs to be updated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21977: SPARK-25004: Add spark.executor.pyspark.memory li...

2018-08-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21977#discussion_r207635841
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
 ---
@@ -81,6 +82,17 @@ case class AggregateInPandasExec(
 
 val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
 val reuseWorker = 
inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
+val memoryMb = {
--- End diff --

The other configuration options are already duplicated, so I was trying to 
make as few changes as possible.

Since there are several duplicated options, I think it makes more sense to 
pass the SparkConf through to PythonRunner so it can extract its own 
configuration.

@holdenk, would you like this refactor done in this PR, or should I do it 
in a follow-up?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.

2018-08-03 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21977
  
@ifilonenko, I opened follow-up SPARK-25021 for adding the PySpark memory 
allocation to Kubernetes. @mccheah, I opened follow-up SPARK-25022 for Mesos.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21977: SPARK-25004: Add spark.executor.pyspark.memory li...

2018-08-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21977#discussion_r207631295
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -333,7 +340,7 @@ private[spark] class Client(
 val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
 logInfo("Verifying our application has not requested more than the 
maximum " +
   s"memory capability of the cluster ($maxMem MB per container)")
-val executorMem = executorMemory + executorMemoryOverhead
+val executorMem = executorMemory + executorMemoryOverhead + 
pysparkWorkerMemory
 if (executorMem > maxMem) {
   throw new IllegalArgumentException(s"Required executor memory 
($executorMemory" +
 s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem 
MB) of this cluster! " +
--- End diff --

I like having it broken out so users can see where their allocation is 
going. Otherwise, users that only know about `spark.executor.memory` might not 
know how their allocation is 1gb higher when running PySpark. I've updated this 
to include the worker memory.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21977: SPARK-25004: Add spark.executor.pyspark.memory li...

2018-08-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21977#discussion_r207630342
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -114,6 +114,10 @@ package object config {
 .checkValue(_ >= 0, "The off-heap memory size must not be negative")
 .createWithDefault(0)
 
+  private[spark] val PYSPARK_EXECUTOR_MEMORY = 
ConfigBuilder("spark.executor.pyspark.memory")
+  .bytesConf(ByteUnit.MiB)
+  .createOptional
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21977: SPARK-25004: Add spark.executor.pyspark.memory li...

2018-08-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21977#discussion_r207629997
  
--- Diff: python/pyspark/worker.py ---
@@ -259,6 +260,26 @@ def main(infile, outfile):
  "PYSPARK_DRIVER_PYTHON are correctly set.") %
 ("%d.%d" % sys.version_info[:2], version))
 
+# set up memory limits
+memory_limit_mb = int(os.environ.get('PYSPARK_EXECUTOR_MEMORY_MB', 
"-1"))
+total_memory = resource.RLIMIT_AS
+try:
+(total_memory_limit, max_total_memory) = 
resource.getrlimit(total_memory)
+msg = "Current mem: {0} of max 
{1}\n".format(total_memory_limit, max_total_memory)
+sys.stderr.write()
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21978: SPARK-25006: Add CatalogTableIdentifier.

2018-08-03 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21978
  
Retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-08-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r207624290
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -2217,6 +2218,98 @@ class Analyzer(
 }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. 
This rule will:
+   *
+   * - Reorder columns when the write is by name
+   * - Insert safe casts when data types do not match
+   * - Insert aliases when column names do not match
+   * - Detect plans that are not compatible with the output table and 
throw AnalysisException
+   */
+  object ResolveOutputRelation extends Rule[LogicalPlan] {
+override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperators {
+  case append @ AppendData(table, query, isByName)
+  if table.resolved && query.resolved && !append.resolved =>
+val projection = resolveOutputColumns(table.name, table.output, 
query, isByName)
+
+if (projection != query) {
+  append.copy(query = projection)
+} else {
+  append
+}
+}
+
+def resolveOutputColumns(
+tableName: String,
+expected: Seq[Attribute],
+query: LogicalPlan,
+byName: Boolean): LogicalPlan = {
+
+  if (expected.size < query.output.size) {
+throw new AnalysisException(
+  s"""Cannot write to '$tableName', too many data columns:
+ |Table columns: ${expected.map(_.name).mkString(", ")}
+ |Data columns: ${query.output.map(_.name).mkString(", 
")}""".stripMargin)
+  }
+
+  val errors = new mutable.ArrayBuffer[String]()
+  val resolved: Seq[NamedExpression] = if (byName) {
+expected.flatMap { tableAttr =>
+  query.resolveQuoted(tableAttr.name, resolver) match {
+case Some(queryExpr) =>
+  checkField(tableAttr, queryExpr, err => errors += err)
--- End diff --

I'd much rather pass functions than mutable state into other methods 
(side-effects). Plus, a function is cleaner because it doesn't require a a 
particular storage for the caller. If this were in a tight loop, there would be 
an argument for changing it but this only happens once for a plan.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-08-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r207623469
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -352,6 +351,36 @@ case class Join(
   }
 }
 
+/**
+ * Append data to an existing table.
+ */
+case class AppendData(
+table: NamedRelation,
+query: LogicalPlan,
+isByName: Boolean) extends LogicalPlan {
+  override def children: Seq[LogicalPlan] = Seq(query)
+  override def output: Seq[Attribute] = Seq.empty
+
+  override lazy val resolved: Boolean = {
+query.output.size == table.output.size && 
query.output.zip(table.output).forall {
+  case (inAttr, outAttr) =>
+  inAttr.name == outAttr.name &&// names must match
+  outAttr.dataType.sameType(inAttr.dataType) && // types must match
--- End diff --

Good catch, I agree.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-08-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r207623351
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -352,6 +351,36 @@ case class Join(
   }
 }
 
+/**
+ * Append data to an existing table.
+ */
+case class AppendData(
+table: NamedRelation,
+query: LogicalPlan,
+isByName: Boolean) extends LogicalPlan {
+  override def children: Seq[LogicalPlan] = Seq(query)
+  override def output: Seq[Attribute] = Seq.empty
+
+  override lazy val resolved: Boolean = {
+query.output.size == table.output.size && 
query.output.zip(table.output).forall {
+  case (inAttr, outAttr) =>
+  inAttr.name == outAttr.name &&// names must match
+  outAttr.dataType.sameType(inAttr.dataType) && // types must match
--- End diff --

Good catch.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-08-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r207591742
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala ---
@@ -336,4 +337,97 @@ object DataType {
   case (fromDataType, toDataType) => fromDataType == toDataType
 }
   }
+
+  /**
+   * Returns true if the write data type can be read using the read data 
type.
+   *
+   * The write type is compatible with the read type if:
+   * - Both types are arrays, the array element types are compatible, and 
element nullability is
+   *   compatible (read allows nulls or write does not contain nulls).
+   * - Both types are maps and the map key and value types are compatible, 
and value nullability
+   *   is compatible  (read allows nulls or write does not contain nulls).
+   * - Both types are structs and each field in the read struct is present 
in the write struct and
+   *   compatible (including nullability), or is nullable if the write 
struct does not contain the
+   *   field. Write-side structs are not compatible if they contain fields 
that are not present in
+   *   the read-side struct.
+   * - Both types are atomic and the write type can be safely cast to the 
read type.
+   *
+   * Extra fields in write-side structs are not allowed to avoid 
accidentally writing data that
+   * the read schema will not read, and to ensure map key equality is not 
changed when data is read.
+   *
+   * @param write a write-side data type to validate against the read type
+   * @param read a read-side data type
+   * @return true if data written with the write type can be read using 
the read type
+   */
+  def canWrite(
+  write: DataType,
+  read: DataType,
+  resolver: Resolver,
+  context: String,
+  addError: String => Unit = (_: String) => {}): Boolean = {
+(write, read) match {
+  case (wArr: ArrayType, rArr: ArrayType) =>
+if (wArr.containsNull && !rArr.containsNull) {
+  addError(s"Cannot write nullable elements to array of non-nulls: 
'$context'")
+  false
+} else {
+  canWrite(wArr.elementType, rArr.elementType, resolver, context + 
".element", addError)
+}
+
+  case (wMap: MapType, rMap: MapType) =>
+// map keys cannot include data fields not in the read schema 
without changing equality when
+// read. map keys can be missing fields as long as they are 
nullable in the read schema.
+if (wMap.valueContainsNull && !rMap.valueContainsNull) {
+  addError(s"Cannot write nullable values to map of non-nulls: 
'$context'")
+  false
+} else {
+  canWrite(wMap.keyType, rMap.keyType, resolver, context + ".key", 
addError) &&
+  canWrite(wMap.valueType, rMap.valueType, resolver, context + 
".value", addError)
+}
+
+  case (StructType(writeFields), StructType(readFields)) =>
+lazy val extraFields = writeFields.map(_.name).toSet -- 
readFields.map(_.name)
+
+var result = readFields.forall { readField =>
+  val fieldContext = context + "." + readField.name
+  writeFields.find(writeField => resolver(writeField.name, 
readField.name)) match {
--- End diff --

Yeah, I've been thinking about whether the rest of the code actually 
implements the rules that `canWrite` expects. This may not work, but I don't 
think it makes sense to add nested struct reordering if it isn't done already 
in this commit.

Should we disable nested structures, or go ahead with this validation and 
fix reordering later?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-08-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r207590794
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -2217,6 +2218,98 @@ class Analyzer(
 }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. 
This rule will:
+   *
+   * - Reorder columns when the write is by name
+   * - Insert safe casts when data types do not match
+   * - Insert aliases when column names do not match
+   * - Detect plans that are not compatible with the output table and 
throw AnalysisException
--- End diff --

Agreed. I'm already working on a suite for `canWrite` and will be adding 
better tests shortly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-08-02 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r207395603
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -352,6 +351,36 @@ case class Join(
   }
 }
 
+/**
+ * Append data to an existing table.
+ */
+case class AppendData(
+table: NamedRelation,
+query: LogicalPlan,
+isByName: Boolean) extends LogicalPlan {
+  override def children: Seq[LogicalPlan] = Seq(query)
--- End diff --

I also see that `InsertIntoTable` doesn't list the relation as a child.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21978: SPARK-25006: Add CatalogTableIdentifier.

2018-08-02 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21978
  
@gatorsmile and @cloud-fan, this adds catalog to `TableIdentifier` in 
preparation for multi-catalog support. `TableIdentifier` continues to work 
as-is to ensure that there are no behavior changes in code paths that do not 
have catalog support. I've updated `UnresolvedRelation` to demonstrate how 
migration to `CatalogTableIdentifier` will work.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21978: SPARK-25006: Add CatalogTableIdentifier.

2018-08-02 Thread rdblue
GitHub user rdblue opened a pull request:

https://github.com/apache/spark/pull/21978

SPARK-25006: Add CatalogTableIdentifier.

## What changes were proposed in this pull request?

This adds `CatalogTableIdentifier`, which is an identifier that consists of 
a triple: catalog, database, and table. Catalog and database are optional.

The existing `TableIdentifier` class extends `CatalogTableIdentifier` and 
is guarateed to have no defined catalog component. Classes that expect a 
`TableIdentifier` will continue to use `TableIdentifier` to ensure that 
catalogs are not leaked into code paths that do not support them.

This adds a parser rule, `catalogTableIdentifier`, that can parse 
identifiers with a catalog. An identifier with only two components will match 
database and table, leaving the catalog undefined. Only identifiers with three 
components will have a defined catalog. In addition, rules must be re-written 
to support `catalogTableIdentifier`. Existing rules will continue to use 
`tableIdentifier` with no catalog.

## How was this patch tested?

Existing tests. This should not change any behavior.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/spark 
SPARK-25006-add-catalog-to-table-identifier

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21978.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21978


commit d61e0f1ccdce630d65c81ee27a233a14759415ea
Author: Ryan Blue 
Date:   2018-08-02T21:03:21Z

SPARK-25006: Add CatalogTableIdentifier.

This adds CatalogTableIdentifier, which is an identifier that consists
of a triple: catalog, database, and table. Catalog and database are
optional.

The existing TableIdentifier class extends CatalogTableIdentifier and is
guarateed to have no defined catalog component. Classes that expect a
TableIdentifier should continue to use TableIdentifier to ensure that
catalogs are not leaked into code paths that do not support them.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-08-02 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21305
  
Retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.

2018-08-02 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21977
  
@holdenk, can you help review this since it is related to PySpark?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21977: SPARK-25004: Add spark.executor.pyspark.memory li...

2018-08-02 Thread rdblue
GitHub user rdblue opened a pull request:

https://github.com/apache/spark/pull/21977

SPARK-25004: Add spark.executor.pyspark.memory limit.

## What changes were proposed in this pull request?

This adds `spark.executor.pyspark.memory` to configure Python's address 
space limit, 
[`resource.RLIMIT_AS`](https://docs.python.org/3/library/resource.html#resource.RLIMIT_AS).
 Limiting Python's address space allows Python to participate in memory 
management. In practice, we see fewer cases of Python taking too much memory 
because it doesn't know to run garbage collection. This results in YARN killing 
fewer containers. This also improves error messages so users know that Python 
is consuming too much memory:

```
  File "build/bdist.linux-x86_64/egg/package/library.py", line 265, in 
fe_engineer
fe_eval_rec.update(f(src_rec_prep, mat_rec_prep))
  File "build/bdist.linux-x86_64/egg/package/library.py", line 163, in 
fe_comp
comparisons = EvaluationUtils.leven_list_compare(src_rec_prep.get(item, 
[]), mat_rec_prep.get(item, []))
  File "build/bdist.linux-x86_64/egg/package/evaluationutils.py", line 25, 
in leven_list_compare
permutations = sorted(permutations, reverse=True)
  MemoryError
```

The new pyspark memory setting is used to increase requested YARN container 
memory, instead of sharing overhead memory between python and off-heap JVM 
activity.

## How was this patch tested?

Tested memory limits in our YARN cluster and verified that MemoryError is 
thrown.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/spark 
SPARK-25004-add-python-memory-limit

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21977.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21977


commit 19cd9c5cce4420729074a0976b129889d70fd56c
Author: Ryan Blue 
Date:   2018-05-09T18:34:50Z

SPARK-25004: Add spark.executor.pyspark.memory limit.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21948: [SPARK-24991][SQL] use InternalRow in DataSourceWriter

2018-08-02 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21948
  
I'm changing my +1 to -1 because read-side changes are mixed in and because 
copies are the responsibility of data sources if they buffer and hold 
references to earlier rows.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21948: [SPARK-24991][SQL] use InternalRow in DataSourceW...

2018-08-02 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21948#discussion_r207295461
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
 ---
@@ -89,8 +89,7 @@ class RateStreamContinuousReader(options: 
DataSourceOptions) extends ContinuousR
 start.runTimeMs,
 i,
 numPartitions,
-perPartitionRate)
-.asInstanceOf[InputPartition[InternalRow]]
+perPartitionRate): InputPartition[InternalRow]
--- End diff --

This should be in a separate commit. I didn't notice yesterday that this is 
for the writer until it was linked from the other issue. I think this change 
needs to get in, but it should not be mixed into changes for the write path.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21948: [SPARK-24991][SQL] use InternalRow in DataSourceW...

2018-08-02 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21948#discussion_r207294283
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
 ---
@@ -50,4 +50,15 @@
*this ID will always be 0.
*/
   DataWriter createDataWriter(int partitionId, long taskId, long 
epochId);
+
+  /**
+   * When true, Spark will reuse the same data object instance when 
sending data to the data writer,
+   * for better performance. Data writers should carefully handle the data 
objects if it's reused,
+   * e.g. do not buffer the data objects in a list. By default it returns 
false for safety, data
+   * sources can override it if their data writers immediately write the 
data object to somewhere
+   * else like a memory buffer or disk.
+   */
+  default boolean reuseDataObject() {
--- End diff --

I don't think this should be added in this commit. This is to move to 
`InternalRow` and should not alter the API. I'm fine documenting this, but 
writers are responsible for defensive copies if necessary. This default is 
going to cause sources to be slower and I don't think it is necessary for 
implementations that aren't tests buffering data in memory.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21948: [SPARK-24991][SQL] use InternalRow in DataSourceW...

2018-08-02 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21948#discussion_r207293465
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
 ---
@@ -50,4 +50,15 @@
*this ID will always be 0.
*/
   DataWriter createDataWriter(int partitionId, long taskId, long 
epochId);
+
+  /**
+   * When true, Spark will reuse the same data object instance when 
sending data to the data writer,
+   * for better performance. Data writers should carefully handle the data 
objects if it's reused,
+   * e.g. do not buffer the data objects in a list. By default it returns 
false for safety, data
--- End diff --

No, Iceberg assumes that data objects are reused.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21911: [SPARK-24940][SQL] Coalesce and Repartition Hint for SQL...

2018-08-02 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21911
  
I'd like to fix the AnalysisError message and I noted one small nit in the 
tests.

+1 when the AnalysisError message is fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21911: [SPARK-24940][SQL] Coalesce and Repartition Hint ...

2018-08-02 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21911#discussion_r207285870
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
 ---
@@ -17,15 +17,25 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.catalyst.plans.logical._
 
 class ResolveHintsSuite extends AnalysisTest {
   import org.apache.spark.sql.catalyst.analysis.TestRelations._
 
+  private def intercept(plan: LogicalPlan, messages: String*): Unit = {
--- End diff --

Since this is an assertion and `intercept` is a well-known method, can you 
rename it to `assertErrorMessage` or something more descriptive?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21911: [SPARK-24940][SQL] Coalesce and Repartition Hint ...

2018-08-02 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21911#discussion_r207285266
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
 ---
@@ -102,6 +104,35 @@ object ResolveHints {
 }
   }
 
+  /**
+   * COALESCE Hint accepts name "COALESCE" and "REPARTITION".
+   * Its parameter includes a partition number.
+   */
+  class ResolveCoalesceHints(conf: SQLConf) extends Rule[LogicalPlan] {
+private val COALESCE_HINT_NAMES = Set("COALESCE", "REPARTITION")
+
+private def applyCoalesceHint(
+  plan: LogicalPlan,
+  numPartitions: Int,
+  shuffle: Boolean): LogicalPlan = {
+  Repartition(numPartitions, shuffle, plan)
+}
+
+def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+  case h: UnresolvedHint if 
COALESCE_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
+h.parameters match {
+  case Seq(Literal(numPartitions: Int, IntegerType)) =>
+val shuffle = h.name.toUpperCase(Locale.ROOT) match {
+  case "REPARTITION" => true
+  case "COALESCE" => false
+}
+applyCoalesceHint(h.child, numPartitions, shuffle)
+  case _ =>
+throw new AnalysisException("COALESCE Hint expects a partition 
number as parameter")
--- End diff --

Can you use `h.name.toUpperCase` in this error message instead? I think 
that would be a better message for users that don't know the relationship 
between COALESCE and REPARTITION.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-08-02 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21305
  
@cloud-fan, I'll fix the conflicts and re-run tests. Yesterday's tests 
passed after I updated for your feedback. I'd like to try to get this in soon 
because it is taking so much time to resolve conflicts without any real changes.

FYI @gatorsmile, @bersprockets, @jzhuge


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21946: [SPARK-24990][SQL] merge ReadSupport and ReadSupportWith...

2018-08-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21946
  
+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-08-01 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r207043490
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -2217,6 +2218,100 @@ class Analyzer(
 }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. 
This rule will:
+   *
+   * - Reorder columns when the write is by name
+   * - Insert safe casts when data types do not match
+   * - Insert aliases when column names do not match
+   * - Detect plans that are not compatible with the output table and 
throw AnalysisException
+   */
+  object ResolveOutputRelation extends Rule[LogicalPlan] {
+override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+  case append @ AppendData(table, query, isByName)
+  if table.resolved && query.resolved && !append.resolved =>
+val projection = resolveOutputColumns(table.name, table.output, 
query, isByName)
+
+if (projection != query) {
+  append.copy(query = projection)
+} else {
+  append
+}
+}
+
+def resolveOutputColumns(
+tableName: String,
+expected: Seq[Attribute],
+query: LogicalPlan,
+byName: Boolean): LogicalPlan = {
+
+  if (expected.size < query.output.size) {
+throw new AnalysisException(
+  s"""Cannot write to '$tableName', too many data columns:
+ |Table columns: ${expected.map(_.name).mkString(", ")}
+ |Data columns: ${query.output.map(_.name).mkString(", 
")}""".stripMargin)
+  }
+
+  val errors = new mutable.ArrayBuffer[String]()
+  val resolved: Seq[NamedExpression] = if (byName) {
+expected.flatMap { outAttr =>
+  query.resolveQuoted(outAttr.name, resolver) match {
+case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
+  errors += s"Cannot write nullable values to non-null column 
'${outAttr.name}'"
+  None
+
+case Some(inAttr) if !DataType.canWrite(outAttr.dataType, 
inAttr.dataType, resolver) =>
+  Some(upcast(inAttr, outAttr))
+
+case Some(inAttr) =>
+  Some(inAttr) // matches nullability, datatype, and name
+
+case _ =>
+  errors += s"Cannot find data for output column 
'${outAttr.name}'"
+  None
+  }
+}
+
+  } else {
+if (expected.size > query.output.size) {
+  throw new AnalysisException(
+s"""Cannot write to '$tableName', not enough data columns:
+   |Table columns: ${expected.map(_.name).mkString(", ")}
+   |Data columns: ${query.output.map(_.name).mkString(", 
")}""".stripMargin)
+}
+
+query.output.zip(expected).flatMap {
+  case (inAttr, outAttr) if inAttr.nullable && !outAttr.nullable =>
+errors += s"Cannot write nullable values to non-null column 
'${outAttr.name}'"
+None
+
+  case (inAttr, outAttr)
+if !DataType.canWrite(inAttr.dataType, outAttr.dataType, 
resolver) ||
--- End diff --

I updated this to use your suggestion: now it always adds the cast.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-08-01 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r207043344
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -2217,6 +2218,100 @@ class Analyzer(
 }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. 
This rule will:
+   *
+   * - Reorder columns when the write is by name
+   * - Insert safe casts when data types do not match
+   * - Insert aliases when column names do not match
+   * - Detect plans that are not compatible with the output table and 
throw AnalysisException
+   */
+  object ResolveOutputRelation extends Rule[LogicalPlan] {
+override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+  case append @ AppendData(table, query, isByName)
+  if table.resolved && query.resolved && !append.resolved =>
+val projection = resolveOutputColumns(table.name, table.output, 
query, isByName)
+
+if (projection != query) {
+  append.copy(query = projection)
+} else {
+  append
+}
+}
+
+def resolveOutputColumns(
+tableName: String,
+expected: Seq[Attribute],
+query: LogicalPlan,
+byName: Boolean): LogicalPlan = {
+
+  if (expected.size < query.output.size) {
+throw new AnalysisException(
+  s"""Cannot write to '$tableName', too many data columns:
+ |Table columns: ${expected.map(_.name).mkString(", ")}
+ |Data columns: ${query.output.map(_.name).mkString(", 
")}""".stripMargin)
+  }
+
+  val errors = new mutable.ArrayBuffer[String]()
+  val resolved: Seq[NamedExpression] = if (byName) {
+expected.flatMap { outAttr =>
+  query.resolveQuoted(outAttr.name, resolver) match {
+case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
+  errors += s"Cannot write nullable values to non-null column 
'${outAttr.name}'"
+  None
+
+case Some(inAttr) if !DataType.canWrite(outAttr.dataType, 
inAttr.dataType, resolver) =>
+  Some(upcast(inAttr, outAttr))
+
+case Some(inAttr) =>
+  Some(inAttr) // matches nullability, datatype, and name
+
+case _ =>
+  errors += s"Cannot find data for output column 
'${outAttr.name}'"
+  None
+  }
+}
+
+  } else {
+if (expected.size > query.output.size) {
+  throw new AnalysisException(
+s"""Cannot write to '$tableName', not enough data columns:
+   |Table columns: ${expected.map(_.name).mkString(", ")}
+   |Data columns: ${query.output.map(_.name).mkString(", 
")}""".stripMargin)
+}
+
+query.output.zip(expected).flatMap {
--- End diff --

I refactored these into a helper method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-08-01 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r207043203
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -2217,6 +2218,100 @@ class Analyzer(
 }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. 
This rule will:
+   *
+   * - Reorder columns when the write is by name
+   * - Insert safe casts when data types do not match
+   * - Insert aliases when column names do not match
+   * - Detect plans that are not compatible with the output table and 
throw AnalysisException
+   */
+  object ResolveOutputRelation extends Rule[LogicalPlan] {
+override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+  case append @ AppendData(table, query, isByName)
+  if table.resolved && query.resolved && !append.resolved =>
+val projection = resolveOutputColumns(table.name, table.output, 
query, isByName)
+
+if (projection != query) {
+  append.copy(query = projection)
+} else {
+  append
+}
+}
+
+def resolveOutputColumns(
+tableName: String,
+expected: Seq[Attribute],
+query: LogicalPlan,
+byName: Boolean): LogicalPlan = {
+
+  if (expected.size < query.output.size) {
+throw new AnalysisException(
+  s"""Cannot write to '$tableName', too many data columns:
+ |Table columns: ${expected.map(_.name).mkString(", ")}
+ |Data columns: ${query.output.map(_.name).mkString(", 
")}""".stripMargin)
+  }
+
+  val errors = new mutable.ArrayBuffer[String]()
+  val resolved: Seq[NamedExpression] = if (byName) {
+expected.flatMap { outAttr =>
+  query.resolveQuoted(outAttr.name, resolver) match {
+case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
--- End diff --

I fixed this by always failing if `canWrite` returns false and always 
adding the `UpCast`.

Now, `canWrite` will return true if the write type can be cast to the read 
type for atomic types, as determined by `Cast.canSafeCast`. Since it only 
returns a boolean, we always insert the cast and the optimizer should remove it 
if it isn't needed.

I also added better error messages. When an error is found, the check will 
add a clear error message by calling `addError: String => Unit`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-08-01 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r207015951
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -2217,6 +2218,100 @@ class Analyzer(
 }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. 
This rule will:
+   *
+   * - Reorder columns when the write is by name
+   * - Insert safe casts when data types do not match
+   * - Insert aliases when column names do not match
+   * - Detect plans that are not compatible with the output table and 
throw AnalysisException
+   */
+  object ResolveOutputRelation extends Rule[LogicalPlan] {
+override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+  case append @ AppendData(table, query, isByName)
+  if table.resolved && query.resolved && !append.resolved =>
+val projection = resolveOutputColumns(table.name, table.output, 
query, isByName)
+
+if (projection != query) {
+  append.copy(query = projection)
+} else {
+  append
+}
+}
+
+def resolveOutputColumns(
+tableName: String,
+expected: Seq[Attribute],
+query: LogicalPlan,
+byName: Boolean): LogicalPlan = {
+
+  if (expected.size < query.output.size) {
+throw new AnalysisException(
+  s"""Cannot write to '$tableName', too many data columns:
+ |Table columns: ${expected.map(_.name).mkString(", ")}
+ |Data columns: ${query.output.map(_.name).mkString(", 
")}""".stripMargin)
+  }
+
+  val errors = new mutable.ArrayBuffer[String]()
+  val resolved: Seq[NamedExpression] = if (byName) {
+expected.flatMap { outAttr =>
+  query.resolveQuoted(outAttr.name, resolver) match {
+case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
+  errors += s"Cannot write nullable values to non-null column 
'${outAttr.name}'"
+  None
+
+case Some(inAttr) if !DataType.canWrite(outAttr.dataType, 
inAttr.dataType, resolver) =>
+  Some(upcast(inAttr, outAttr))
+
+case Some(inAttr) =>
+  Some(inAttr) // matches nullability, datatype, and name
+
+case _ =>
+  errors += s"Cannot find data for output column 
'${outAttr.name}'"
+  None
+  }
+}
+
+  } else {
+if (expected.size > query.output.size) {
+  throw new AnalysisException(
+s"""Cannot write to '$tableName', not enough data columns:
+   |Table columns: ${expected.map(_.name).mkString(", ")}
+   |Data columns: ${query.output.map(_.name).mkString(", 
")}""".stripMargin)
+}
+
+query.output.zip(expected).flatMap {
+  case (inAttr, outAttr) if inAttr.nullable && !outAttr.nullable =>
+errors += s"Cannot write nullable values to non-null column 
'${outAttr.name}'"
+None
+
+  case (inAttr, outAttr)
+if !DataType.canWrite(inAttr.dataType, outAttr.dataType, 
resolver) ||
--- End diff --

Sorry, I thought you were asking whether upcast is necessary. We could 
probably always upcast.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanRow

2018-08-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21921
  
Yeah, I'd say that if it isn't documented then lets go with the usually RTC 
conventions.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21948: [SPARK-24991][SQL] use InternalRow in DataSourceWriter

2018-08-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21948
  
+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21948: [SPARK-24991][SQL] use InternalRow in DataSourceW...

2018-08-01 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21948#discussion_r207006871
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
 ---
@@ -89,8 +89,7 @@ class RateStreamContinuousReader(options: 
DataSourceOptions) extends ContinuousR
 start.runTimeMs,
 i,
 numPartitions,
-perPartitionRate)
-.asInstanceOf[InputPartition[InternalRow]]
+perPartitionRate): InputPartition[InternalRow]
--- End diff --

Sounds good to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21946: [SPARK-24990][SQL] merge ReadSupport and ReadSupportWith...

2018-08-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21946
  
Yeah, I'm fine with this, then.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-08-01 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r206979097
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java ---
@@ -38,15 +38,16 @@
* If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
* submitted.
*
-   * @param jobId A unique string for the writing job. It's possible that 
there are many writing
-   *  jobs running at the same time, and the returned {@link 
DataSourceWriter} can
-   *  use this job id to distinguish itself from other jobs.
+   * @param writeUUID A unique string for the writing job. It's possible 
that there are many writing
+   *  jobs running at the same time, and the returned 
{@link DataSourceWriter} can
+   *  use this job id to distinguish itself from other 
jobs.
* @param schema the schema of the data to be written.
* @param mode the save mode which determines what to do when the data 
are already in this data
* source, please refer to {@link SaveMode} for more details.
* @param options the options for the returned data source writer, which 
is an immutable
*case-insensitive string-to-string map.
+   * @return a writer to append data to this data source
--- End diff --

The data source API only handles writes/appends and reads. The high-level 
logical combine append writes with other operations.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-08-01 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r206978690
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java ---
@@ -38,15 +38,16 @@
* If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
* submitted.
*
-   * @param jobId A unique string for the writing job. It's possible that 
there are many writing
-   *  jobs running at the same time, and the returned {@link 
DataSourceWriter} can
-   *  use this job id to distinguish itself from other jobs.
+   * @param writeUUID A unique string for the writing job. It's possible 
that there are many writing
--- End diff --

This is removed in the v2 API redesign.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-08-01 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r206978506
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -352,6 +351,36 @@ case class Join(
   }
 }
 
+/**
+ * Append data to an existing table.
+ */
+case class AppendData(
+table: NamedRelation,
+query: LogicalPlan,
+isByName: Boolean) extends LogicalPlan {
+  override def children: Seq[LogicalPlan] = Seq(query)
--- End diff --

What transforms would be enabled by making `table` a child? Do we want to 
transform the relation? It is fixed so I didn't think that was a good idea.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-08-01 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r206978261
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -2217,6 +2218,100 @@ class Analyzer(
 }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. 
This rule will:
+   *
+   * - Reorder columns when the write is by name
+   * - Insert safe casts when data types do not match
+   * - Insert aliases when column names do not match
+   * - Detect plans that are not compatible with the output table and 
throw AnalysisException
+   */
+  object ResolveOutputRelation extends Rule[LogicalPlan] {
+override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+  case append @ AppendData(table, query, isByName)
+  if table.resolved && query.resolved && !append.resolved =>
+val projection = resolveOutputColumns(table.name, table.output, 
query, isByName)
+
+if (projection != query) {
+  append.copy(query = projection)
+} else {
+  append
+}
+}
+
+def resolveOutputColumns(
+tableName: String,
+expected: Seq[Attribute],
+query: LogicalPlan,
+byName: Boolean): LogicalPlan = {
+
+  if (expected.size < query.output.size) {
+throw new AnalysisException(
+  s"""Cannot write to '$tableName', too many data columns:
+ |Table columns: ${expected.map(_.name).mkString(", ")}
+ |Data columns: ${query.output.map(_.name).mkString(", 
")}""".stripMargin)
+  }
+
+  val errors = new mutable.ArrayBuffer[String]()
+  val resolved: Seq[NamedExpression] = if (byName) {
+expected.flatMap { outAttr =>
+  query.resolveQuoted(outAttr.name, resolver) match {
+case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
+  errors += s"Cannot write nullable values to non-null column 
'${outAttr.name}'"
+  None
+
+case Some(inAttr) if !DataType.canWrite(outAttr.dataType, 
inAttr.dataType, resolver) =>
+  Some(upcast(inAttr, outAttr))
+
+case Some(inAttr) =>
+  Some(inAttr) // matches nullability, datatype, and name
+
+case _ =>
+  errors += s"Cannot find data for output column 
'${outAttr.name}'"
+  None
+  }
+}
+
+  } else {
+if (expected.size > query.output.size) {
+  throw new AnalysisException(
+s"""Cannot write to '$tableName', not enough data columns:
+   |Table columns: ${expected.map(_.name).mkString(", ")}
+   |Data columns: ${query.output.map(_.name).mkString(", 
")}""".stripMargin)
+}
+
+query.output.zip(expected).flatMap {
+  case (inAttr, outAttr) if inAttr.nullable && !outAttr.nullable =>
+errors += s"Cannot write nullable values to non-null column 
'${outAttr.name}'"
+None
+
+  case (inAttr, outAttr)
+if !DataType.canWrite(inAttr.dataType, outAttr.dataType, 
resolver) ||
--- End diff --

The contract of `DataType.canWrite` is that the data written is compatible 
with the table's read schema. It should allow promotion from `int` to `long` 
and `float` to `double` and then use upcast here to write the correct type. I 
think this is a problem with `canWrite`, not the upcast.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-08-01 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r206977289
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -2217,6 +2218,100 @@ class Analyzer(
 }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. 
This rule will:
+   *
+   * - Reorder columns when the write is by name
+   * - Insert safe casts when data types do not match
+   * - Insert aliases when column names do not match
+   * - Detect plans that are not compatible with the output table and 
throw AnalysisException
+   */
+  object ResolveOutputRelation extends Rule[LogicalPlan] {
+override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+  case append @ AppendData(table, query, isByName)
+  if table.resolved && query.resolved && !append.resolved =>
+val projection = resolveOutputColumns(table.name, table.output, 
query, isByName)
+
+if (projection != query) {
+  append.copy(query = projection)
+} else {
+  append
+}
+}
+
+def resolveOutputColumns(
+tableName: String,
+expected: Seq[Attribute],
+query: LogicalPlan,
+byName: Boolean): LogicalPlan = {
+
+  if (expected.size < query.output.size) {
+throw new AnalysisException(
+  s"""Cannot write to '$tableName', too many data columns:
+ |Table columns: ${expected.map(_.name).mkString(", ")}
+ |Data columns: ${query.output.map(_.name).mkString(", 
")}""".stripMargin)
+  }
+
+  val errors = new mutable.ArrayBuffer[String]()
+  val resolved: Seq[NamedExpression] = if (byName) {
+expected.flatMap { outAttr =>
+  query.resolveQuoted(outAttr.name, resolver) match {
+case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
+  errors += s"Cannot write nullable values to non-null column 
'${outAttr.name}'"
+  None
+
+case Some(inAttr) if !DataType.canWrite(outAttr.dataType, 
inAttr.dataType, resolver) =>
+  Some(upcast(inAttr, outAttr))
+
+case Some(inAttr) =>
+  Some(inAttr) // matches nullability, datatype, and name
+
+case _ =>
+  errors += s"Cannot find data for output column 
'${outAttr.name}'"
+  None
+  }
+}
+
+  } else {
+if (expected.size > query.output.size) {
+  throw new AnalysisException(
+s"""Cannot write to '$tableName', not enough data columns:
+   |Table columns: ${expected.map(_.name).mkString(", ")}
+   |Data columns: ${query.output.map(_.name).mkString(", 
")}""".stripMargin)
+}
+
+query.output.zip(expected).flatMap {
--- End diff --

This handles both append cases, write by name and write by position. This 
block is checking by position. I'll see if I can refactor the checks into a 
private method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-08-01 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r206976856
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -2217,6 +2218,100 @@ class Analyzer(
 }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. 
This rule will:
+   *
+   * - Reorder columns when the write is by name
+   * - Insert safe casts when data types do not match
+   * - Insert aliases when column names do not match
+   * - Detect plans that are not compatible with the output table and 
throw AnalysisException
+   */
+  object ResolveOutputRelation extends Rule[LogicalPlan] {
+override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+  case append @ AppendData(table, query, isByName)
+  if table.resolved && query.resolved && !append.resolved =>
+val projection = resolveOutputColumns(table.name, table.output, 
query, isByName)
+
+if (projection != query) {
+  append.copy(query = projection)
+} else {
+  append
+}
+}
+
+def resolveOutputColumns(
+tableName: String,
+expected: Seq[Attribute],
+query: LogicalPlan,
+byName: Boolean): LogicalPlan = {
+
+  if (expected.size < query.output.size) {
+throw new AnalysisException(
+  s"""Cannot write to '$tableName', too many data columns:
+ |Table columns: ${expected.map(_.name).mkString(", ")}
+ |Data columns: ${query.output.map(_.name).mkString(", 
")}""".stripMargin)
+  }
+
+  val errors = new mutable.ArrayBuffer[String]()
+  val resolved: Seq[NamedExpression] = if (byName) {
+expected.flatMap { outAttr =>
+  query.resolveQuoted(outAttr.name, resolver) match {
+case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
--- End diff --

This is done by `DataType.canWrite`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21946: [SPARK-24990][SQL] merge ReadSupport and ReadSupportWith...

2018-08-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21946
  
@cloud-fan, from your comment around the same time as mine, it sounds like 
the confusion may just be in how you're updating the current API to the 
proposed one. Can you post a migration plan? It sounds like something like this:

`ReadSupport` and `ReadSupportWithSchema` -> `BatchReadSupportProvider`
`DataSourceReader` -> `ReadSupport`

Is that right? The re-use of `ReadSupport` would explain the confusion on 
my end.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21946: [SPARK-24990][SQL] merge ReadSupport and ReadSupportWith...

2018-08-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21946
  
Isn't this unnecessary after the API redesign?

For the redesign, the `DataSourceV2` or a `ReadSupportProvider` will supply 
a `create` method (or `anonymousTable`) to return a `Table` that implements 
`ReadSupport`. `ReadSupport` should not accept user schemas because the schema 
should be accessible from the `Table` itself. That way, we can use the same 
table-based relation (see 
https://github.com/apache/spark/pull/21877/files#diff-35ba4ffb5ccb9b18b43226f1d5effa23R82).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanRow

2018-08-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21921
  
@cloud-fan, @gatorsmile, I'm fine with that if it's documented somewhere. I 
wasn't aware of that convention and no one brought it up the last time I 
pointed out commits without a committer +1.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanRow

2018-08-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21921
  
This looks fine other than the possibly unnecessary cast.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanR...

2018-08-01 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21921#discussion_r206946453
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala 
---
@@ -416,12 +405,12 @@ class AdvancedDataSourceV2 extends DataSourceV2 with 
ReadSupport {
   requiredSchema
 }
 
-override def planRowInputPartitions(): JList[InputPartition[Row]] = {
-  val lowerBound = filters.collect {
+override def planInputPartitions(): JList[InputPartition[InternalRow]] 
= {
+  val lowerBound = filters.collectFirst {
--- End diff --

Fine by me since it is so small, just wanted to point it out.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanR...

2018-08-01 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21921#discussion_r206946335
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala 
---
@@ -121,17 +121,6 @@ class DataSourceV2Suite extends QueryTest with 
SharedSQLContext {
 }
   }
 
-  test("unsafe row scan implementation") {
-Seq(classOf[UnsafeRowDataSourceV2], 
classOf[JavaUnsafeRowDataSourceV2]).foreach { cls =>
--- End diff --

Ok.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanR...

2018-08-01 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21921#discussion_r206946259
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
 ---
@@ -169,7 +170,7 @@ class RateStreamMicroBatchReader(options: 
DataSourceOptions, checkpointLocation:
 (0 until numPartitions).map { p =>
   new RateStreamMicroBatchInputPartition(
 p, numPartitions, rangeStart, rangeEnd, localStartTimeMs, 
relativeMsPerValue)
-: InputPartition[Row]
+: InputPartition[InternalRow]
--- End diff --

This is fine since it isn't a cast, but it's generally better to check 
whether these are still necessary after refactoring.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanR...

2018-08-01 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21921#discussion_r206946076
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
 ---
@@ -91,7 +90,7 @@ class RateStreamContinuousReader(options: 
DataSourceOptions)
 i,
 numPartitions,
 perPartitionRate)
-.asInstanceOf[InputPartition[Row]]
+.asInstanceOf[InputPartition[InternalRow]]
--- End diff --

I don't think it's a good idea to leave casts. Can you check to see if this 
can be avoided? I found in #21118 that many of the casts were unnecessary if 
variables had declared types and it is much better to avoid explicit casts that 
work around the type system.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanR...

2018-08-01 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21921#discussion_r206936701
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala 
---
@@ -456,57 +445,20 @@ class AdvancedInputPartition(start: Int, end: Int, 
requiredSchema: StructType)
 current < end
   }
 
-  override def get(): Row = {
+  override def get(): InternalRow = {
 val values = requiredSchema.map(_.name).map {
   case "i" => current
   case "j" => -current
 }
-Row.fromSeq(values)
+InternalRow.fromSeq(values)
   }
 }
 
 
-class UnsafeRowDataSourceV2 extends DataSourceV2 with ReadSupport {
--- End diff --

These aren't Row implementations. Why remove them?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanR...

2018-08-01 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21921#discussion_r206936422
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala 
---
@@ -416,12 +405,12 @@ class AdvancedDataSourceV2 extends DataSourceV2 with 
ReadSupport {
   requiredSchema
 }
 
-override def planRowInputPartitions(): JList[InputPartition[Row]] = {
-  val lowerBound = filters.collect {
+override def planInputPartitions(): JList[InputPartition[InternalRow]] 
= {
+  val lowerBound = filters.collectFirst {
--- End diff --

Nit: this is an unrelated change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanR...

2018-08-01 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21921#discussion_r206936202
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala 
---
@@ -121,17 +121,6 @@ class DataSourceV2Suite extends QueryTest with 
SharedSQLContext {
 }
   }
 
-  test("unsafe row scan implementation") {
-Seq(classOf[UnsafeRowDataSourceV2], 
classOf[JavaUnsafeRowDataSourceV2]).foreach { cls =>
--- End diff --

Why remove unsafe tests?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanR...

2018-08-01 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21921#discussion_r206935616
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
 ---
@@ -169,7 +170,7 @@ class RateStreamMicroBatchReader(options: 
DataSourceOptions, checkpointLocation:
 (0 until numPartitions).map { p =>
   new RateStreamMicroBatchInputPartition(
 p, numPartitions, rangeStart, rangeEnd, localStartTimeMs, 
relativeMsPerValue)
-: InputPartition[Row]
+: InputPartition[InternalRow]
--- End diff --

Is this needed? Doesn't RateStreamMicroBatchInputPartition implement 
InputPartition[InternalRow]?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanR...

2018-08-01 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21921#discussion_r206935176
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
 ---
@@ -91,7 +90,7 @@ class RateStreamContinuousReader(options: 
DataSourceOptions)
 i,
 numPartitions,
 perPartitionRate)
-.asInstanceOf[InputPartition[Row]]
+.asInstanceOf[InputPartition[InternalRow]]
--- End diff --

Why is this cast necessary?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanRow

2018-08-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21921
  
@cloud-fan, I thought it was a requirement to have a committer +1 before 
merging. Or is this [list of 
committers](https://spark.apache.org/committers.html) out of date?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-07-31 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21305
  
@cloud-fan, I'll look into the test failures tomorrow, but this has been 
passing tests for weeks so I think it is still safe to review when you have 
time. We can fix both in parallel so that we can get validated writes in 2.4.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21911: [SPARK-24940][SQL] Coalesce Hint for SQL Queries

2018-07-31 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21911#discussion_r206741523
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
 ---
@@ -660,6 +660,62 @@ class PlanParserSuite extends AnalysisTest {
 )
   }
 
+  test ("insert hint syntax") {
+assertEqual(
+  "INSERT INTO s /*+ COALESCE(10) */ SELECT * FROM t",
+  InsertIntoTable(table("s"), Map.empty,
+UnresolvedHint("COALESCE", Seq(Literal(10)),
+  table("t").select(star())), false, false))
+assertEqual(
+  "INSERT INTO TABLE s /*+ COALESCE(50, true) */ SELECT * FROM t",
+  InsertIntoTable(table("s"), Map.empty,
+UnresolvedHint("COALESCE", Seq(Literal(50), Literal(true)),
+  table("t").select(star())), false, false))
+assertEqual(
+  "INSERT INTO s /*+ REPARTITION(100) */ SELECT * FROM t",
+  InsertIntoTable(table("s"), Map.empty,
+UnresolvedHint("REPARTITION", Seq(Literal(100)),
+  table("t").select(star())), false, false))
+assertEqual(
+  "INSERT INTO TABLE s /*+ REPARTITION(20, false) */ SELECT * FROM t",
+  InsertIntoTable(table("s"), Map.empty,
+UnresolvedHint("REPARTITION", Seq(Literal(20), Literal(false)),
+  table("t").select(star())), false, false))
+assertEqual(
+  "INSERT OVERWRITE TABLE s /*+ COALESCE(10) */ SELECT * FROM t",
+  InsertIntoTable(table("s"), Map.empty,
+UnresolvedHint("COALESCE", Seq(Literal(10)),
+  table("t").select(star())), true, false))
+assertEqual(
+  "INSERT OVERWRITE TABLE s /*+ COALESCE(50, true) */ SELECT * FROM t",
+  InsertIntoTable(table("s"), Map.empty,
+UnresolvedHint("COALESCE", Seq(Literal(50), Literal(true)),
+  table("t").select(star())), true, false))
+assertEqual(
+  "INSERT OVERWRITE TABLE s /*+ REPARTITION(100) */ SELECT * FROM t",
+  InsertIntoTable(table("s"), Map.empty,
+UnresolvedHint("REPARTITION", Seq(Literal(100)),
+  table("t").select(star())), true, false))
+assertEqual(
+  "INSERT OVERWRITE TABLE s /*+ REPARTITION(20, false) */ SELECT * 
FROM t",
+  InsertIntoTable(table("s"), Map.empty,
+UnresolvedHint("REPARTITION", Seq(Literal(20), Literal(false)),
+  table("t").select(star())), true, false))
+
+// Multiple hints
+assertEqual(
+  "INSERT INTO s /*+ REPARTITION(100), COALESCE(50, true), 
COALESCE(10) */ SELECT * FROM t",
+  InsertIntoTable(table("s"), Map.empty,
+UnresolvedHint("REPARTITION", Seq(Literal(100)),
+  UnresolvedHint("COALESCE", Seq(Literal(50), Literal(true)),
+UnresolvedHint("COALESCE", Seq(Literal(10)),
+  table("t").select(star(), false, false))
+
+// Wrong hint location
+intercept("INSERT INTO /*+ COALESCE(10) */ s SELECT * FROM t",
+  "extraneous input '/*+' expecting")
--- End diff --

I don't think it is necessary to test the wrong hint location because there 
are so many wrong hint locations and the error is just a generic parse error. 
If there were a friendly error message, that would be worth a test case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21911: [SPARK-24940][SQL] Coalesce Hint for SQL Queries

2018-07-31 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21911#discussion_r206741178
  
--- Diff: 
sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 ---
@@ -332,7 +332,7 @@ resource
 ;
 
 queryNoWith
-: insertInto? queryTerm queryOrganization  
#singleInsertQuery
+: (insertInto (hints+=hint)*)? queryTerm queryOrganization 
#singleInsertQuery
--- End diff --

Why allow more than one coalesce hint? Is this useful in some way?

I think it is confusing to users to allow multiple hints because the order 
in which they are applied to the plan isn't clear. Is it left-to-right order? 
Would `REPARTITION(100), COALESCE(10)` repartition and then coalesce?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21911: [SPARK-24940][SQL] Coalesce Hint for SQL Queries

2018-07-31 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21911#discussion_r206740826
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
 ---
@@ -102,6 +104,39 @@ object ResolveHints {
 }
   }
 
+  /**
+   * For coalesce hint, we accept "COALESCE" and "REPARTITION".
+   * Its parameters include a partition number and an optional boolean to 
indicate
+   * whether shuffle is allowed.
+   */
+  class ResolveCoalesceHints(conf: SQLConf) extends Rule[LogicalPlan] {
+private val COALESCE_HINT_NAMES = Set("COALESCE", "REPARTITION")
+
+private def applyCoalesceHint(
+  plan: LogicalPlan,
+  numPartitions: Int,
+  shuffle: Boolean): LogicalPlan = {
+  Repartition(numPartitions, shuffle, plan)
+}
+
+def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+  case h: UnresolvedHint if 
COALESCE_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
+h.parameters match {
+  case Seq(Literal(i, IntegerType)) =>
+val defaultShuffle = h.name.toUpperCase(Locale.ROOT) match {
+  case "REPARTITION" => true
+  case _ => false
+}
+applyCoalesceHint(h.child, i.asInstanceOf[Int], defaultShuffle)
--- End diff --

Instead of casting, this should add the expected type to the pattern match.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21911: [SPARK-24940][SQL] Coalesce Hint for SQL Queries

2018-07-31 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21911#discussion_r206689783
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
 ---
@@ -102,6 +104,39 @@ object ResolveHints {
 }
   }
 
+  /**
+   * For coalesce hint, we accept "COALESCE" and "REPARTITION".
+   * Its parameters include a partition number and an optional boolean to 
indicate
+   * whether shuffle is allowed.
+   */
+  class ResolveCoalesceHints(conf: SQLConf) extends Rule[LogicalPlan] {
+private val COALESCE_HINT_NAMES = Set("COALESCE", "REPARTITION")
+
+private def applyCoalesceHint(
+  plan: LogicalPlan,
+  numPartitions: Int,
+  shuffle: Boolean): LogicalPlan = {
+  Repartition(numPartitions, shuffle, plan)
+}
+
+def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+  case h: UnresolvedHint if 
COALESCE_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
--- End diff --

I think this should match `COALESCE(num)` or `REPARTITION(num)` and drop 
the boolean.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21911: [SPARK-24940][SQL] Coalesce Hint for SQL Queries

2018-07-31 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21911
  
@jzhuge, I think it is confusing that this hint exposes the `shuffle` 
boolean flag. The Spark API makes a clear distinction between `repartition` and 
`coalesce` where `coalesce` means that Spark won't add a shuffle. Adding the 
boolean here would allow users to coalesce with a shuffle (repartition) or 
repartition without a shuffle (coalesce). I can't think of a good reason to do 
that. The reason why there is a boolean `shuffle` param is to use the same plan 
node internally, but I don't think that detail should be leaked into SQL hints.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-07-31 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21305
  
@cloud-fan, @gatorsmile, this has been ready for final review for a while. 
Do you think you'll have some time to look at it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21308: SPARK-24253: Add DeleteSupport mix-in for DataSourceV2.

2018-07-26 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21308
  
#21888 shows how this is used to implement DELETE FROM.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21306: [SPARK-24252][SQL] Add catalog registration and table ca...

2018-07-26 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21306
  
#21888 demonstrates how to add a `TableIdentifier` with a catalog element, 
`CatalogTableIdentifier` and how to safely migrate from the old identifier to 
the new one with catalog.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21888: [SPARK-24253][SQL][WIP] Implement DeleteFrom for ...

2018-07-26 Thread rdblue
GitHub user rdblue opened a pull request:

https://github.com/apache/spark/pull/21888

[SPARK-24253][SQL][WIP] Implement DeleteFrom for v2 tables

## What changes were proposed in this pull request?

This adds support for DELETE FROM in SQL using the new DeleteFrom logical 
plan and v2 DeleteSupport mix-in.

To identify the v2 table to delete data from, this uses the TableCatalog 
API introduced in #21306.

TableIdentifier has been updated with a new superclass, 
CatalogTableIdentifier. Code paths that don't support identifiers with a 
catalog continue to use TableIdentifier, allowing a smooth transition even 
though some code assumes there is no support for multiple catalogs.

UnresovledRelation now supports CatalogTableIdentifier, but resolution will 
only happen when the catalog is not defined or through a new rule with support 
for v2 TableCatalog. Existing uses of UnresolvedRelation access the catalog 
using tableIdentifier, which asserts that the catalog is not defined before 
returning to ensure catalog identifiers don't leak to code without catalog 
support.

WIP: This is based on #21306, #21305, and #21877 and includes the changes 
from those PRs.

## How was this patch tested?

WIP, will add tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/spark SPARK-24253-add-delete-from

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21888.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21888


commit dc977d26acb29731f62addaec67ea6dbe077e670
Author: Ryan Blue 
Date:   2018-05-05T01:13:01Z

SPARK-24252: Add v2 data source mix-in for catalog support.

commit 7d75da5e6abc2bf90b8cf7f530412d6b01ee6938
Author: Ryan Blue 
Date:   2018-05-11T21:27:47Z

SPARK-24252: Add copyright headers.

commit 3be3b7c320c7f9b68270112c9d3f7fa0a1ff377c
Author: Ryan Blue 
Date:   2018-07-04T17:02:52Z

SPARK-24252: Update for review comments.

* Rename CatalogSupport to TableSupport
* Rename DataSourceCatalog to TableCatalog
* Remove name and database from Table

commit 7ad0c8e8fe45f1b85011fc91dd162dba3951593e
Author: Ryan Blue 
Date:   2018-07-04T17:19:45Z

SPARK-24252: Add TableChange example to Javadocs.

commit 3de2cfa3d069aa905c2c5076acc8d59b5ee0c63e
Author: Ryan Blue 
Date:   2018-07-25T18:11:22Z

SPARK-24252: Update catalog API in org.apache.spark.sql.catalog.v2.

commit ae61a797f584fba8ca2601b76f80774d22c3845c
Author: Ryan Blue 
Date:   2018-07-25T19:25:32Z

SPARK-24252: Add tests and clarify javadoc.

commit 8b135607512fda53ff5adc34ac8b965b04f3a3d9
Author: Ryan Blue 
Date:   2018-07-25T20:00:23Z

SPARK-24252: Fix Map.Entry reference causing doc failures.

commit b4c505758a5137a311ac098965ace19af33bd198
Author: Ryan Blue 
Date:   2018-07-26T05:07:52Z

SPARK-24252: Move catalog.v2 classes into catalyst.

This is needed for catalyst plans to use catalogs.

commit 85a49d729e0ae75cf6e1725216829c7d5e79a8a1
Author: Ryan Blue 
Date:   2018-05-07T15:54:37Z

SPARK-24251: Add AppendData logical plan.

This adds a new logical plan, AppendData, that was proposed in
SPARK-23521. This also adds an analyzer rule to validate data written
with AppendData against the target table. DataFrameWriter is also
updated so that v2 writes use the new AppendData logical plan.

commit 770a31cf43903736b10455a82b4743987dd27db6
Author: Ryan Blue 
Date:   2018-05-11T22:04:15Z

SPARK-24253: Add DeleteSupport mix-in for DataSourceV2.

commit f769341547cdc63f77456c1c2677070a4d3d363c
Author: Ryan Blue 
Date:   2018-05-11T22:09:22Z

SPARK-24253: Add interface description to DeleteSupport.

commit af16c4238fd486a82e86dda02ad21279104afe21
Author: Ryan Blue 
Date:   2018-06-13T23:08:21Z

Add DeleteFrom logical plan, add to parser.

commit 88c12a47cbc3eea04ed9d90e91c62285de594b7e
Author: Ryan Blue 
Date:   2018-07-25T18:11:45Z

Add CTAS and RTAS support.

This uses the catalog API introduced in SPARK-24252 to implement CTAS
and RTAS plans.

commit 19b83bd796406ccc69ed8a138ff7652bcb6049d1
Author: Ryan Blue 
Date:   2018-07-26T22:02:25Z

SPARK-24253: Implement DELETE FROM for v2 tables.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...

2018-07-26 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r205571827
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogProvider.java 
---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.sql.internal.SQLConf;
+
+/**
+ * A marker interface to provide a catalog implementation for Spark.
+ * 
+ * Implementations can provide catalog functions by implementing 
additional interfaces, like
+ * {@link TableCatalog} to expose table operations.
+ * 
+ * Catalog implementations must implement this marker interface to be 
loaded by
+ * {@link Catalogs#load(String, SQLConf)}. The loader will instantiate 
catalog classes using the
+ * required public no-arg constructor. After creating an instance, it will 
be configured by calling
+ * {@link #initialize(CaseInsensitiveStringMap)}.
+ * 
+ * Catalog implementations are registered to a name by adding a 
configuration option to Spark:
+ * {@code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. 
All configuration properties
+ * in the Spark configuration that share the catalog name prefix,
+ * {@code spark.sql.catalog.catalog-name.(key)=(value)} will be passed in 
the case insensitive
+ * string map of options in initialization with the prefix removed. An 
additional property,
+ * {@code name}, is also added to the options and will contain the 
catalog's name; in this case,
+ * "catalog-name".
+ */
+public interface CatalogProvider {
+  /**
+   * Called to initialize configuration.
+   * 
+   * This method is called once, just after the provider is instantiated.
+   *
+   * @param options a case-insensitive string map of configuration
+   */
+  void initialize(CaseInsensitiveStringMap options);
--- End diff --

That's a Scala map and the v2 APIs are intended to be used with both Java 
and Scala. My intent is to reuse this map in place of DataSourceOptions, so at 
least we will reduce some duplication.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #12313: [SPARK-14543] [SQL] Improve InsertIntoTable column resol...

2018-07-26 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/12313
  
This is addressed by #21305.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12313: [SPARK-14543] [SQL] Improve InsertIntoTable colum...

2018-07-26 Thread rdblue
Github user rdblue closed the pull request at:

https://github.com/apache/spark/pull/12313


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21237: [SPARK-23325][WIP] Test parquet returning internal row

2018-07-26 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21237
  
This is no longer needed. #21118 fixes the copy problem by always inserting 
a projection that copies, but delaying until after filters are run.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21237: [SPARK-23325][WIP] Test parquet returning interna...

2018-07-26 Thread rdblue
Github user rdblue closed the pull request at:

https://github.com/apache/spark/pull/21237


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21877: [SPARK-24923][SQL][WIP] Add unpartitioned CTAS and RTAS ...

2018-07-25 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21877
  
@cloud-fan, @gatorsmile, @marmbrus, this PR demonstrates how plans would 
use the catalog changes introduced in #21306. To see the changes, you may want 
to look at just the last commit because this includes changes from other PRs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21877: [SPARK-24923][SQL][WIP] Add unpartitioned CTAS an...

2018-07-25 Thread rdblue
GitHub user rdblue opened a pull request:

https://github.com/apache/spark/pull/21877

[SPARK-24923][SQL][WIP] Add unpartitioned CTAS and RTAS support for 
DataSourceV2

## What changes were proposed in this pull request?

* Remove extends from `ReadSupport` and `WriteSupport` classes for use with 
`Table`
* Add CTAS and RTAS logical plans
* Refactor physical write plans so AppendData, CTAS, and RTAS use the same 
base class
* Add support for `TableCatalog` to `DataFrameReader` and `DataFrameWriter`
* Add `TableV2Relation` for tables that are loaded by `TableCatalog` and 
have no `DataSource` instance
* Move implicit helpers into `DataSourceV2Implicits` to avoid future churn

Note that this doesn't handle `partitionBy` in `DataFrameWriter`. Adding 
support for partitioned tables will require validation rules.

This is based on unmerged work and includes the commits from #21306 and 
#21305.

## How was this patch tested?

Adding unit tests for CTAS and RTAS.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/spark add-ctas-rtas-v2-plans

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21877.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21877


commit 8372f5bb47a0d6269bb16b3dc16f6f3278d2f5fd
Author: Ryan Blue 
Date:   2018-05-05T01:13:01Z

SPARK-24252: Add v2 data source mix-in for catalog support.

commit 1238af73872b0105d0c5dfbbd8da5c8f18afe408
Author: Ryan Blue 
Date:   2018-05-07T15:54:37Z

SPARK-24251: Add AppendData logical plan.

This adds a new logical plan, AppendData, that was proposed in
SPARK-23521. This also adds an analyzer rule to validate data written
with AppendData against the target table. DataFrameWriter is also
updated so that v2 writes use the new AppendData logical plan.

commit d308d3c75f78242c822eab6d11fb651d94f10aa6
Author: Ryan Blue 
Date:   2018-07-25T18:11:45Z

Add CTAS and RTAS support.

This uses the catalog API introduced in SPARK-24252 to implement CTAS
and RTAS plans.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...

2018-07-25 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r205237682
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
---
@@ -609,6 +611,12 @@ class SparkSession private(
*/
   @transient lazy val catalog: Catalog = new CatalogImpl(self)
 
+  @transient private lazy val catalogs = new mutable.HashMap[String, 
CatalogProvider]()
+
+  private[sql] def catalog(name: String): CatalogProvider = synchronized {
--- End diff --

Note that this is `private[sql]`. This allows us to use the named 
`TableCatalog` instances without solving how multiple catalogs should be 
exposed to users through a public API just yet.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21306: [SPARK-24252][SQL] Add catalog registration and table ca...

2018-07-25 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21306
  
@marmbrus, @cloud-fan, @gatorsmile, I've updated this PR to use reflection 
to instantiate catalogs. This allows implementations to provide named catalogs 
(and reuse implementations) and configure those catalogs with Spark 
configuration properties.

FYI @bersprockets, @felixcheung, @jzhuge


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-07-24 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21305
  
@cloud-fan, @gatorsmile, is it possible to get this in for 2.4? This 
validates writes to data source tables so I think it is a good one to have.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-07-24 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21305
  
Retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21118: SPARK-23325: Use InternalRow when reading with DataSourc...

2018-07-24 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21118
  
Thanks for reviewing and merging @cloud-fan, @gatorsmile, @felixcheung!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for table cat...

2018-07-23 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21306
  
@cloud-fan, @gatorsmile, I don't think this should be merged yet.

I've been implementing CTAS and RTAS based on this commit and I don't think 
it makes sense to get a `TableCatalog` instance from the data source. The data 
source should be determined by the catalog, not the other way around.

Otherwise, we could have a case where a `test` catalog uses the `parquet` 
source, but that `parquet` source would return a `prod` catalog for its 
`TableCatalog` because `prod` is the default. If catalogs can reuse data 
sources, then the catalog should be determined first.

FYI @bersprockets, @felixcheung, @henryr 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21118: SPARK-23325: Use InternalRow when reading with DataSourc...

2018-07-23 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21118
  
@cloud-fan, any update on merging this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21118: SPARK-23325: Use InternalRow when reading with DataSourc...

2018-07-20 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21118
  
Rebased on master to fix conflicts.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21118: SPARK-23325: Use InternalRow when reading with Da...

2018-07-20 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21118#discussion_r204150008
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
 ---
@@ -53,7 +54,7 @@ class KafkaContinuousReader(
 metadataPath: String,
 initialOffsets: KafkaOffsetRangeLimit,
 failOnDataLoss: Boolean)
-  extends ContinuousReader with SupportsScanUnsafeRow with Logging {
--- End diff --

We can, but this is intended to make minimal changes. We can add 
optimizations like this in a follow-up.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21118: SPARK-23325: Use InternalRow when reading with Da...

2018-07-20 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21118#discussion_r204149889
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -125,16 +125,13 @@ object DataSourceV2Strategy extends Strategy {
   val filterCondition = postScanFilters.reduceLeftOption(And)
   val withFilter = filterCondition.map(FilterExec(_, 
scan)).getOrElse(scan)
 
-  val withProjection = if (withFilter.output != project) {
-ProjectExec(project, withFilter)
-  } else {
-withFilter
-  }
-
-  withProjection :: Nil
+  // always add the projection, which will produce unsafe rows 
required by some operators
+  ProjectExec(project, withFilter) :: Nil
 
 case r: StreamingDataSourceV2Relation =>
-  DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, 
r.reader) :: Nil
+  // ensure there is a projection, which will produce unsafe rows 
required by some operators
+  ProjectExec(r.output,
--- End diff --

It is perfectly fine for sources to produce UnsafeRow because it is an 
InternalRow.

I think it is important for us to get to InternalRow in this release. 
UnsafeRow is too hard to produce and the easiest thing to do is to produce 
InternalRow and then call into Spark's UnsafeProjection to produce UnsafeRow. 
That's painful, uses internal APIs, and is slower.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-07-17 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21305
  
@cloud-fan, I've rebased this so it is ready for final review when you get 
a chance. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21556: [SPARK-24549][SQL] Support Decimal type push down to the...

2018-07-14 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21556
  
I misunderstood how it was safe as well. It was Yuming's clarification that 
helped.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21741: [SPARK-24718][SQL] Timestamp support pushdown to parquet...

2018-07-13 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21741
  
+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21741: [SPARK-24718][SQL] Timestamp support pushdown to ...

2018-07-13 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21741#discussion_r202451540
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -378,6 +378,15 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
+  val PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED =
+buildConf("spark.sql.parquet.filterPushdown.timestamp")
+  .doc("If true, enables Parquet filter push-down optimization for 
Timestamp. " +
+"This configuration only has an effect when 
'spark.sql.parquet.filterPushdown' is " +
+"enabled and Timestamp stored as TIMESTAMP_MICROS or 
TIMESTAMP_MILLIS type.")
--- End diff --

I would just note that push-down doesn't work for INT96 timestamps in the 
file. It should work for the others.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21741: [SPARK-24718][SQL] Timestamp support pushdown to ...

2018-07-13 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21741#discussion_r202451374
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -378,6 +378,15 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
+  val PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED =
+buildConf("spark.sql.parquet.filterPushdown.timestamp")
+  .doc("If true, enables Parquet filter push-down optimization for 
Timestamp. " +
+"This configuration only has an effect when 
'spark.sql.parquet.filterPushdown' is " +
+"enabled and Timestamp stored as TIMESTAMP_MICROS or 
TIMESTAMP_MILLIS type.")
+.internal()
+.booleanConf
+.createWithDefault(true)
--- End diff --

Because we're using the file schema, it doesn't mater what the write 
configuration is. It only matters what it was when the file was written. If the 
file has an INT96 timestamp, this should just not push anything down.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21556: [SPARK-24549][SQL] Support Decimal type push down to the...

2018-07-13 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21556
  
+1, I think this looks ready to go.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21556: [SPARK-24549][SQL] Support Decimal type push down to the...

2018-07-13 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21556
  
@HyukjinKwon, even if the values are null, the makeEq function only casts 
null to Java Integer so the handling is still safe. It just looks odd that 
`null.asInstanceOf[JInt]` is safe. Thanks to @wangyum for explaining it. Even 
if the null-safe equality predicate contains a null value, this should be safe.

And, passing null in an equals predicate is supported by Parquet.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-13 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r202448032
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -37,41 +39,64 @@ import org.apache.spark.unsafe.types.UTF8String
 /**
  * Some utility function to convert Spark data source filters to Parquet 
filters.
  */
-private[parquet] class ParquetFilters(pushDownDate: Boolean, 
pushDownStartWith: Boolean) {
+private[parquet] class ParquetFilters(
+pushDownDate: Boolean,
+pushDownDecimal: Boolean,
+pushDownStartWith: Boolean) {
 
   private case class ParquetSchemaType(
   originalType: OriginalType,
   primitiveTypeName: PrimitiveTypeName,
-  decimalMetadata: DecimalMetadata)
-
-  private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, null)
-  private val ParquetByteType = ParquetSchemaType(INT_8, INT32, null)
-  private val ParquetShortType = ParquetSchemaType(INT_16, INT32, null)
-  private val ParquetIntegerType = ParquetSchemaType(null, INT32, null)
-  private val ParquetLongType = ParquetSchemaType(null, INT64, null)
-  private val ParquetFloatType = ParquetSchemaType(null, FLOAT, null)
-  private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, null)
-  private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, null)
-  private val ParquetBinaryType = ParquetSchemaType(null, BINARY, null)
-  private val ParquetDateType = ParquetSchemaType(DATE, INT32, null)
+  length: Int,
+  decimalMeta: DecimalMetadata)
+
+  private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, 0, 
null)
+  private val ParquetByteType = ParquetSchemaType(INT_8, INT32, 0, null)
+  private val ParquetShortType = ParquetSchemaType(INT_16, INT32, 0, null)
+  private val ParquetIntegerType = ParquetSchemaType(null, INT32, 0, null)
+  private val ParquetLongType = ParquetSchemaType(null, INT64, 0, null)
+  private val ParquetFloatType = ParquetSchemaType(null, FLOAT, 0, null)
+  private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, 0, null)
+  private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, 0, null)
+  private val ParquetBinaryType = ParquetSchemaType(null, BINARY, 0, null)
+  private val ParquetDateType = ParquetSchemaType(DATE, INT32, 0, null)
 
   private def dateToDays(date: Date): SQLDate = {
 DateTimeUtils.fromJavaDate(date)
   }
 
+  private def decimalToInt32(decimal: JBigDecimal): Integer = 
decimal.unscaledValue().intValue()
+
+  private def decimalToInt64(decimal: JBigDecimal): JLong = 
decimal.unscaledValue().longValue()
+
+  private def decimalToByteArray(decimal: JBigDecimal, numBytes: Int): 
Binary = {
+val decimalBuffer = new Array[Byte](numBytes)
+val bytes = decimal.unscaledValue().toByteArray
+
+val fixedLengthBytes = if (bytes.length == numBytes) {
+  bytes
+} else {
+  val signByte = if (bytes.head < 0) -1: Byte else 0: Byte
+  java.util.Arrays.fill(decimalBuffer, 0, numBytes - bytes.length, 
signByte)
+  System.arraycopy(bytes, 0, decimalBuffer, numBytes - bytes.length, 
bytes.length)
+  decimalBuffer
+}
+Binary.fromReusedByteArray(fixedLengthBytes, 0, numBytes)
+  }
+
   private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
--- End diff --

Sounds good. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-13 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r202447544
  
--- Diff: sql/core/benchmarks/FilterPushdownBenchmark-results.txt ---
@@ -292,120 +292,120 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 Select 1 decimal(9, 2) row (value = 7864320): Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
 

-Parquet Vectorized3785 / 3867  4.2 
240.6   1.0X
-Parquet Vectorized (Pushdown) 3820 / 3928  4.1 
242.9   1.0X
-Native ORC Vectorized 3981 / 4049  4.0 
253.1   1.0X
-Native ORC Vectorized (Pushdown)   702 /  735 22.4 
 44.6   5.4X
+Parquet Vectorized4407 / 4852  3.6 
280.2   1.0X
+Parquet Vectorized (Pushdown) 1602 / 1634  9.8 
101.8   2.8X
--- End diff --

Okay, I see. The tenths and hundredths are always 0, which makes the 
precision-8 numbers actually precision-10. It is still odd that this is causing 
Parquet to have no stats, but I'm happy with the fix. Thanks for explaining.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21556: [SPARK-24549][SQL] Support Decimal type push down to the...

2018-07-12 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21556
  
@wangyum, can you explain what was happening with the `decimal(9,2)` 
benchmark more clearly? I asked additional questions, but the thread is on a 
line that changed so it's collapsed by default.

Also, `valueCanMakeFilterOn` returns true for all null values, so I think 
we still have a problem there. Conversion from EqualNullSafe needs to support 
null filter values.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-12 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r202093955
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -225,12 +316,44 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
   def createFilter(schema: MessageType, predicate: sources.Filter): 
Option[FilterPredicate] = {
 val nameToType = getFieldMap(schema)
 
+def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): 
Boolean = value match {
+  case decimal: JBigDecimal =>
+decimal.scale == decimalMeta.getScale
+  case _ => false
+}
+
+// Since SPARK-24716, ParquetFilter accepts parquet file schema to 
convert to
+// data source Filter. This must make sure that filter value matched 
the Filter.
+// If doesn't matched, then the schema used to read the file is 
incorrect,
+// which would cause data corruption.
+def valueCanMakeFilterOn(name: String, value: Any): Boolean = {
+  value == null || (nameToType(name) match {
+case ParquetBooleanType => value.isInstanceOf[JBoolean]
+case ParquetByteType | ParquetShortType | ParquetIntegerType => 
value.isInstanceOf[Number]
+case ParquetLongType => value.isInstanceOf[JLong]
+case ParquetFloatType => value.isInstanceOf[JFloat]
+case ParquetDoubleType => value.isInstanceOf[JDouble]
+case ParquetStringType => value.isInstanceOf[String]
+case ParquetBinaryType => value.isInstanceOf[Array[Byte]]
+case ParquetDateType => value.isInstanceOf[Date]
+case ParquetSchemaType(DECIMAL, INT32, 0, decimalMeta) =>
--- End diff --

Have you tried not using `|` and ignoring the physical type with `_`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   9   10   >