spark git commit: [SPARK-25133][SQL][DOC] Avro data source guide

2018-08-22 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 1747469a1 -> 05974f943


[SPARK-25133][SQL][DOC] Avro data source guide

## What changes were proposed in this pull request?

Create documentation for AVRO data source.
The new page will be linked in 
https://spark.apache.org/docs/latest/sql-programming-guide.html

For preview please unzip the following file:
[AvroDoc.zip](https://github.com/apache/spark/files/2313011/AvroDoc.zip)

Closes #22121 from gengliangwang/avroDoc.

Authored-by: Gengliang Wang 
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/05974f94
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/05974f94
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/05974f94

Branch: refs/heads/master
Commit: 05974f9431e9718a5f331a9892b7d81aca8387a6
Parents: 1747469
Author: Gengliang Wang 
Authored: Thu Aug 23 13:45:49 2018 +0800
Committer: Wenchen Fan 
Committed: Thu Aug 23 13:45:49 2018 +0800

--
 docs/avro-data-source-guide.md | 380 
 docs/sql-programming-guide.md  |   3 +
 2 files changed, 383 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/05974f94/docs/avro-data-source-guide.md
--
diff --git a/docs/avro-data-source-guide.md b/docs/avro-data-source-guide.md
new file mode 100644
index 000..d3b81f0
--- /dev/null
+++ b/docs/avro-data-source-guide.md
@@ -0,0 +1,380 @@
+---
+layout: global
+title: Apache Avro Data Source Guide
+---
+
+* This will become a table of contents (this text will be scraped).
+{:toc}
+
+Since Spark 2.4 release, [Spark 
SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) provides 
built-in support for reading and writing Apache Avro data.
+
+## Deploying
+The `spark-avro` module is external and not included in `spark-submit` or 
`spark-shell` by default.
+
+As with any Spark applications, `spark-submit` is used to launch your 
application. `spark-avro_{{site.SCALA_BINARY_VERSION}}`
+and its dependencies can be directly added to `spark-submit` using 
`--packages`, such as,
+
+./bin/spark-submit --packages 
org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
 ...
+
+For experimenting on `spark-shell`, you can also use `--packages` to add 
`org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}` and its 
dependencies directly,
+
+./bin/spark-shell --packages 
org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
 ...
+
+See [Application Submission Guide](submitting-applications.html) for more 
details about submitting applications with external dependencies.
+
+## Load and Save Functions
+
+Since `spark-avro` module is external, there is no `.avro` API in 
+`DataFrameReader` or `DataFrameWriter`.
+
+To load/save data in Avro format, you need to specify the data source option 
`format` as `avro`(or `org.apache.spark.sql.avro`).
+
+
+{% highlight scala %}
+
+val usersDF = 
spark.read.format("avro").load("examples/src/main/resources/users.avro")
+usersDF.select("name", 
"favorite_color").write.format("avro").save("namesAndFavColors.avro")
+
+{% endhighlight %}
+
+
+{% highlight java %}
+
+Dataset usersDF = 
spark.read().format("avro").load("examples/src/main/resources/users.avro");
+usersDF.select("name", 
"favorite_color").write().format("avro").save("namesAndFavColors.avro");
+
+{% endhighlight %}
+
+
+{% highlight python %}
+
+df = spark.read.format("avro").load("examples/src/main/resources/users.avro")
+df.select("name", 
"favorite_color").write.format("avro").save("namesAndFavColors.avro")
+
+{% endhighlight %}
+
+
+{% highlight r %}
+
+df <- read.df("examples/src/main/resources/users.avro", "avro")
+write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", 
"avro")
+
+{% endhighlight %}
+
+
+
+## to_avro() and from_avro()
+The Avro package provides function `to_avro` to encode a column as binary in 
Avro 
+format, and `from_avro()` to decode Avro binary data into a column. Both 
functions transform one column to 
+another column, and the input/output SQL data type can be complex type or 
primitive type.
+
+Using Avro record as columns are useful when reading from or writing to a 
streaming source like Kafka. Each 
+Kafka key-value record will be augmented with some metadata, such as the 
ingestion timestamp into Kafka, the offset in Kafka, etc.
+* If the "value" field that contains your data is in Avro, you could use 
`from_avro()` to extract your data, enrich it, clean it, and then push it 
downstream to Kafka again or write it out to a file.
+* `to_avro()` can be used to turn structs into Avro records. This method is 
particularly useful when you would like to re-encode multiple columns into a 

svn commit: r28901 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_22_20_01-1747469-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-22 Thread pwendell
Author: pwendell
Date: Thu Aug 23 03:15:47 2018
New Revision: 28901

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_22_20_01-1747469 docs


[This commit notification would consist of 1477 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25167][SPARKR][TEST][MINOR] Minor fixes for R sql tests

2018-08-22 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 0295ad40d -> 1747469a1


[SPARK-25167][SPARKR][TEST][MINOR] Minor fixes for R sql tests

## What changes were proposed in this pull request?
A few SQL tests for R were failing in my development environment. In this PR, i 
am attempting to
address some of them.  Below are the reasons for the failure.

- The catalog api tests assumes catalog artifacts named "foo" to be non 
existent. I think name such as foo and bar are common and i use it frequently. 
I have changed it to a string that i hope is less likely to collide.
- One test assumes that we only have one database in the system. I had more 
than one and it caused the test to fail. I have changed that check.
- One more test which compares two timestamp values fail - i am debugging this 
now. I will send it as a followup - may be.

## How was this patch tested?
Its a test fix.

Closes #22161 from dilipbiswal/r-sql-test-fix1.

Authored-by: Dilip Biswal 
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1747469a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1747469a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1747469a

Branch: refs/heads/master
Commit: 1747469a1ff0b0ab6c5545fe6de63ffe42660580
Parents: 0295ad4
Author: Dilip Biswal 
Authored: Thu Aug 23 10:56:17 2018 +0800
Committer: hyukjinkwon 
Committed: Thu Aug 23 10:56:17 2018 +0800

--
 R/pkg/tests/fulltests/test_sparkSQL.R | 20 +++-
 1 file changed, 11 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1747469a/R/pkg/tests/fulltests/test_sparkSQL.R
--
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R 
b/R/pkg/tests/fulltests/test_sparkSQL.R
index bff6e35..e1f3cf3 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -734,8 +734,8 @@ test_that("test cache, uncache and clearCache", {
   clearCache()
   expect_true(dropTempView("table1"))
 
-  expect_error(uncacheTable("foo"),
-  "Error in uncacheTable : analysis error - Table or view not found: foo")
+  expect_error(uncacheTable("zxwtyswklpf"),
+  "Error in uncacheTable : analysis error - Table or view not found: 
zxwtyswklpf")
 })
 
 test_that("insertInto() on a registered table", {
@@ -3632,11 +3632,11 @@ test_that("Collect on DataFrame when NAs exists at the 
top of a timestamp column
 test_that("catalog APIs, currentDatabase, setCurrentDatabase, listDatabases", {
   expect_equal(currentDatabase(), "default")
   expect_error(setCurrentDatabase("default"), NA)
-  expect_error(setCurrentDatabase("foo"),
-   "Error in setCurrentDatabase : analysis error - Database 'foo' 
does not exist")
+  expect_error(setCurrentDatabase("zxwtyswklpf"),
+"Error in setCurrentDatabase : analysis error - Database 'zxwtyswklpf' 
does not exist")
   dbs <- collect(listDatabases())
   expect_equal(names(dbs), c("name", "description", "locationUri"))
-  expect_equal(dbs[[1]], "default")
+  expect_equal(which(dbs[, 1] == "default"), 1)
 })
 
 test_that("catalog APIs, listTables, listColumns, listFunctions", {
@@ -3659,8 +3659,9 @@ test_that("catalog APIs, listTables, listColumns, 
listFunctions", {
   expect_equal(colnames(c),
c("name", "description", "dataType", "nullable", "isPartition", 
"isBucket"))
   expect_equal(collect(c)[[1]][[1]], "speed")
-  expect_error(listColumns("foo", "default"),
-   "Error in listColumns : analysis error - Table 'foo' does not exist in 
database 'default'")
+  expect_error(listColumns("zxwtyswklpf", "default"),
+   paste("Error in listColumns : analysis error - Table",
+ "'zxwtyswklpf' does not exist in database 'default'"))
 
   f <- listFunctions()
   expect_true(nrow(f) >= 200) # 250
@@ -3668,8 +3669,9 @@ test_that("catalog APIs, listTables, listColumns, 
listFunctions", {
c("name", "database", "description", "className", 
"isTemporary"))
   expect_equal(take(orderBy(f, "className"), 1)$className,
"org.apache.spark.sql.catalyst.expressions.Abs")
-  expect_error(listFunctions("foo_db"),
-   "Error in listFunctions : analysis error - Database 'foo_db' 
does not exist")
+  expect_error(listFunctions("zxwtyswklpf_db"),
+   paste("Error in listFunctions : analysis error - Database",
+ "'zxwtyswklpf_db' does not exist"))
 
   # recoverPartitions does not work with tempory view
   expect_error(recoverPartitions("cars"),


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25127] DataSourceV2: Remove SupportsPushDownCatalystFilters

2018-08-22 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 2bc7b7553 -> 0295ad40d


[SPARK-25127] DataSourceV2: Remove SupportsPushDownCatalystFilters

## What changes were proposed in this pull request?
They depend on internal Expression APIs. Let's see how far we can get without 
it.

## How was this patch tested?
Just some code removal. There's no existing tests as far as I can tell so it's 
easy to remove.

Closes #22185 from rxin/SPARK-25127.

Authored-by: Reynold Xin 
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0295ad40
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0295ad40
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0295ad40

Branch: refs/heads/master
Commit: 0295ad40def41b9a8ccefaaa1a7658899fb632a4
Parents: 2bc7b75
Author: Reynold Xin 
Authored: Thu Aug 23 08:10:45 2018 +0800
Committer: Wenchen Fan 
Committed: Thu Aug 23 08:10:45 2018 +0800

--
 .../reader/SupportsPushDownCatalystFilters.java | 57 
 .../v2/reader/SupportsPushDownFilters.java  |  4 --
 .../datasources/v2/DataSourceV2Strategy.scala   |  5 --
 3 files changed, 66 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0295ad40/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java
--
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java
deleted file mode 100644
index 9d79a18..000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.catalyst.expressions.Expression;
-
-/**
- * A mix-in interface for {@link ScanConfigBuilder}. Data source readers can 
implement this
- * interface to push down arbitrary expressions as predicates to the data 
source.
- * This is an experimental and unstable interface as {@link Expression} is not 
public and may get
- * changed in the future Spark versions.
- *
- * Note that, if data source readers implement both this interface and
- * {@link SupportsPushDownFilters}, Spark will ignore {@link 
SupportsPushDownFilters} and only
- * process this interface.
- */
-@InterfaceStability.Unstable
-public interface SupportsPushDownCatalystFilters extends ScanConfigBuilder {
-
-  /**
-   * Pushes down filters, and returns filters that need to be evaluated after 
scanning.
-   */
-  Expression[] pushCatalystFilters(Expression[] filters);
-
-  /**
-   * Returns the catalyst filters that are pushed to the data source via
-   * {@link #pushCatalystFilters(Expression[])}.
-   *
-   * There are 3 kinds of filters:
-   *  1. pushable filters which don't need to be evaluated again after 
scanning.
-   *  2. pushable filters which still need to be evaluated after scanning, 
e.g. parquet
-   * row group filter.
-   *  3. non-pushable filters.
-   * Both case 1 and 2 should be considered as pushed filters and should be 
returned by this method.
-   *
-   * It's possible that there is no filters in the query and
-   * {@link #pushCatalystFilters(Expression[])} is never called, empty array 
should be returned for
-   * this case.
-   */
-  Expression[] pushedCatalystFilters();
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0295ad40/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
--
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
index 5d32a8a..5e7985f 100644
--- 

svn commit: r28899 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_22_16_02-49a1993-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-22 Thread pwendell
Author: pwendell
Date: Wed Aug 22 23:16:07 2018
New Revision: 28899

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_22_16_02-49a1993 docs


[This commit notification would consist of 1478 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24785][SHELL] Making sure REPL prints Spark UI info and then Welcome message

2018-08-22 Thread dbtsai
Repository: spark
Updated Branches:
  refs/heads/master 49a1993b1 -> 2bc7b7553


[SPARK-24785][SHELL] Making sure REPL prints Spark UI info and then Welcome 
message

## What changes were proposed in this pull request?

After https://github.com/apache/spark/pull/21495 the welcome message is printed 
first, and then Scala prompt will be shown before the Spark UI info is printed.

Although it's a minor issue, but visually, it doesn't look as nice as the 
existing behavior. This PR intends to fix it by duplicating the Scala `process` 
code to arrange the printing order. However, one variable is private, so 
reflection has to be used which is not desirable.

We can use this PR to brainstorm how to handle it properly and how Scala can 
change their APIs to fit our need.

## How was this patch tested?

Existing test

Closes #21749 from dbtsai/repl-followup.

Authored-by: DB Tsai 
Signed-off-by: DB Tsai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2bc7b755
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2bc7b755
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2bc7b755

Branch: refs/heads/master
Commit: 2bc7b75537ec81184048738883b282e257cc58de
Parents: 49a1993
Author: DB Tsai 
Authored: Wed Aug 22 23:14:56 2018 +
Committer: DB Tsai 
Committed: Wed Aug 22 23:14:56 2018 +

--
 .../org/apache/spark/repl/SparkILoop.scala  | 138 ++-
 .../spark/repl/SparkILoopInterpreter.scala  |  18 +--
 2 files changed, 138 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2bc7b755/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
--
diff --git 
a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala 
b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index a44051b..9426526 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -22,8 +22,16 @@ import java.io.BufferedReader
 // scalastyle:off println
 import scala.Predef.{println => _, _}
 // scalastyle:on println
+import scala.concurrent.Future
+import scala.reflect.classTag
+import scala.reflect.internal.util.ScalaClassLoader.savingContextLoader
+import scala.reflect.io.File
+import scala.tools.nsc.{GenericRunnerSettings, Properties}
 import scala.tools.nsc.Settings
-import scala.tools.nsc.interpreter.{ILoop, JPrintWriter}
+import scala.tools.nsc.interpreter.{isReplDebug, isReplPower, replProps}
+import scala.tools.nsc.interpreter.{AbstractOrMissingHandler, ILoop, IMain, 
JPrintWriter}
+import scala.tools.nsc.interpreter.{NamedParam, SimpleReader, SplashLoop, 
SplashReader}
+import scala.tools.nsc.interpreter.StdReplTags.tagOfIMain
 import scala.tools.nsc.util.stringFromStream
 import scala.util.Properties.{javaVersion, javaVmName, versionString}
 
@@ -36,7 +44,7 @@ class SparkILoop(in0: Option[BufferedReader], out: 
JPrintWriter)
   def this() = this(None, new JPrintWriter(Console.out, true))
 
   override def createInterpreter(): Unit = {
-intp = new SparkILoopInterpreter(settings, out, initializeSpark)
+intp = new SparkILoopInterpreter(settings, out)
   }
 
   val initializationCommands: Seq[String] = Seq(
@@ -116,6 +124,132 @@ class SparkILoop(in0: Option[BufferedReader], out: 
JPrintWriter)
 super.replay()
   }
 
+  /**
+   * The following code is mostly a copy of `process` implementation in 
`ILoop.scala` in Scala
+   *
+   * In newer version of Scala, `printWelcome` is the first thing to be 
called. As a result,
+   * SparkUI URL information would be always shown after the welcome message.
+   *
+   * However, this is inconsistent compared with the existing version of Spark 
which will always
+   * show SparkUI URL first.
+   *
+   * The only way we can make it consistent will be duplicating the Scala code.
+   *
+   * We should remove this duplication once Scala provides a way to load our 
custom initialization
+   * code, and also customize the ordering of printing welcome message.
+   */
+  override def process(settings: Settings): Boolean = savingContextLoader {
+
+def newReader = in0.fold(chooseReader(settings))(r => SimpleReader(r, out, 
interactive = true))
+
+/** Reader to use before interpreter is online. */
+def preLoop = {
+  val sr = SplashReader(newReader) { r =>
+in = r
+in.postInit()
+  }
+  in = sr
+  SplashLoop(sr, prompt)
+}
+
+/* Actions to cram in parallel while collecting first user input at prompt.
+ * Run with output muted both from ILoop and from the intp reporter.
+ */
+def loopPostInit(): Unit = mumly {
+  // Bind intp somewhere out of the regular 

spark git commit: [SPARK-25163][SQL] Fix flaky test: o.a.s.util.collection.ExternalAppendOnlyMapSuiteCheck

2018-08-22 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 310632498 -> 49a1993b1


[SPARK-25163][SQL] Fix flaky test: 
o.a.s.util.collection.ExternalAppendOnlyMapSuiteCheck

## What changes were proposed in this pull request?

`ExternalAppendOnlyMapSuiteCheck` test is flaky.

We use a `SparkListener` to collect spill metrics of completed stages. 
`withListener` runs the code that does spill. Spill status was checked after 
the code finishes but it was still in `withListener`. At that time it was 
possibly not all events to the listener bus are processed.

We should check spill status after all events are processed.

## How was this patch tested?

Locally ran unit tests.

Closes #22181 from viirya/SPARK-25163.

Authored-by: Liang-Chi Hsieh 
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/49a1993b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/49a1993b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/49a1993b

Branch: refs/heads/master
Commit: 49a1993b168accb6f188c682546f12ea568173c4
Parents: 3106324
Author: Liang-Chi Hsieh 
Authored: Wed Aug 22 14:17:05 2018 -0700
Committer: Shixiong Zhu 
Committed: Wed Aug 22 14:17:05 2018 -0700

--
 core/src/main/scala/org/apache/spark/TestUtils.scala | 10 ++
 1 file changed, 6 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/49a1993b/core/src/main/scala/org/apache/spark/TestUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala 
b/core/src/main/scala/org/apache/spark/TestUtils.scala
index 6cc8fe1..c2ebd38 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -173,10 +173,11 @@ private[spark] object TestUtils {
* Run some code involving jobs submitted to the given context and assert 
that the jobs spilled.
*/
   def assertSpilled(sc: SparkContext, identifier: String)(body: => Unit): Unit 
= {
-withListener(sc, new SpillListener) { listener =>
+val listener = new SpillListener
+withListener(sc, listener) { _ =>
   body
-  assert(listener.numSpilledStages > 0, s"expected $identifier to spill, 
but did not")
 }
+assert(listener.numSpilledStages > 0, s"expected $identifier to spill, but 
did not")
   }
 
   /**
@@ -184,10 +185,11 @@ private[spark] object TestUtils {
* did not spill.
*/
   def assertNotSpilled(sc: SparkContext, identifier: String)(body: => Unit): 
Unit = {
-withListener(sc, new SpillListener) { listener =>
+val listener = new SpillListener
+withListener(sc, listener) { _ =>
   body
-  assert(listener.numSpilledStages == 0, s"expected $identifier to not 
spill, but did")
 }
+assert(listener.numSpilledStages == 0, s"expected $identifier to not 
spill, but did")
   }
 
   /**


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25184][SS] Fixed race condition in StreamExecution that caused flaky test in FlatMapGroupsWithState

2018-08-22 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 68ec4d641 -> 310632498


[SPARK-25184][SS] Fixed race condition in StreamExecution that caused flaky 
test in FlatMapGroupsWithState

## What changes were proposed in this pull request?

The race condition that caused test failure is between 2 threads.
- The MicrobatchExecution thread that processes inputs to produce answers and 
then generates progress events.
- The test thread that generates some input data, checked the answer and then 
verified the query generated progress event.

The synchronization structure between these threads is as follows
1. MicrobatchExecution thread, in every batch, does the following in order.
   a. Processes batch input to generate answer.
   b. Signals `awaitProgressLockCondition` to wake up threads waiting for 
progress using `awaitOffset`
   c. Generates progress event

2. Test execution thread
   a. Calls `awaitOffset` to wait for progress, which waits on 
`awaitProgressLockCondition`.
   b. As soon as `awaitProgressLockCondition` is signaled, it would move on the 
in the test to check answer.
  c. Finally, it would verify the last generated progress event.

What can happen is the following sequence of events: 2a -> 1a -> 1b -> 2b -> 2c 
-> 1c.
In other words, the progress event may be generated after the test tries to 
verify it.

The solution has two steps.
1. Signal the waiting thread after the progress event has been generated, that 
is, after `finishTrigger()`.
2. Increase the timeout of `awaitProgressLockCondition.await(100 ms)` to a 
large value.

This latter is to ensure that test thread for keeps waiting on 
`awaitProgressLockCondition`until the MicroBatchExecution thread explicitly 
signals it. With the existing small timeout of 100ms the following sequence can 
occur.
 - MicroBatchExecution thread updates committed offsets
 - Test thread waiting on `awaitProgressLockCondition` accidentally times out 
after 100 ms, finds that the committed offsets have been updated, therefore 
returns from `awaitOffset` and moves on to the progress event tests.
 - MicroBatchExecution thread then generates progress event and signals. But 
the test thread has already attempted to verify the event and failed.

By increasing the timeout to large (e.g., `streamingTimeoutMs = 60 seconds`, 
similar to `awaitInitialization`), this above type of race condition is also 
avoided.

## How was this patch tested?
Ran locally many times.

Closes #22182 from tdas/SPARK-25184.

Authored-by: Tathagata Das 
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/31063249
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/31063249
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/31063249

Branch: refs/heads/master
Commit: 3106324986612800240bc8c945be90c4cb368d79
Parents: 68ec4d6
Author: Tathagata Das 
Authored: Wed Aug 22 12:22:53 2018 -0700
Committer: Tathagata Das 
Committed: Wed Aug 22 12:22:53 2018 -0700

--
 .../kafka010/KafkaMicroBatchSourceSuite.scala   |  3 +-
 .../streaming/MicroBatchExecution.scala |  5 ++-
 .../execution/streaming/StreamExecution.scala   |  4 +-
 .../sql/streaming/StateStoreMetricsTest.scala   | 44 +++-
 .../apache/spark/sql/streaming/StreamTest.scala |  2 +-
 5 files changed, 33 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/31063249/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
--
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 946b636..c9c5250 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -970,7 +970,8 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest 
{
   makeSureGetOffsetCalled,
   Execute { q =>
 // wait to reach the last offset in every partition
-q.awaitOffset(0, KafkaSourceOffset(partitionOffsets.mapValues(_ => 
3L)))
+q.awaitOffset(
+  0, KafkaSourceOffset(partitionOffsets.mapValues(_ => 3L)), 
streamingTimeout.toMillis)
   },
   CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22),
   StopStream,

http://git-wip-us.apache.org/repos/asf/spark/blob/31063249/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
--
diff --git 

svn commit: r28897 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_22_12_02-68ec4d6-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-22 Thread pwendell
Author: pwendell
Date: Wed Aug 22 19:16:30 2018
New Revision: 28897

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_22_12_02-68ec4d6 docs


[This commit notification would consist of 1478 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25181][CORE] Limit Thread Pool size in BlockManager Master and Slave endpoints

2018-08-22 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 2381953ab -> 68ec4d641


[SPARK-25181][CORE] Limit Thread Pool size in BlockManager Master and Slave 
endpoints

## What changes were proposed in this pull request?

Limit Thread Pool size in BlockManager Master and Slave endpoints.

Currently, BlockManagerMasterEndpoint and BlockManagerSlaveEndpoint both have 
thread pools with nearly unbounded (Integer.MAX_VALUE) numbers of threads. In 
certain cases, this can lead to driver OOM errors. This change limits the 
thread pools to 100 threads; this should not break any existing behavior 
because any tasks beyond that number will get queued.

## How was this patch tested?

Manual testing

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Closes #22176 from mukulmurthy/25181-threads.

Authored-by: Mukul Murthy 
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/68ec4d64
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/68ec4d64
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/68ec4d64

Branch: refs/heads/master
Commit: 68ec4d641b87d2ab6a8cafc5d10c08253ae09e3d
Parents: 2381953
Author: Mukul Murthy 
Authored: Wed Aug 22 10:36:20 2018 -0700
Committer: Shixiong Zhu 
Committed: Wed Aug 22 10:36:20 2018 -0700

--
 .../org/apache/spark/storage/BlockManagerMasterEndpoint.scala | 3 ++-
 .../org/apache/spark/storage/BlockManagerSlaveEndpoint.scala  | 2 +-
 2 files changed, 3 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/68ec4d64/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 8e8f7d1..f984cf7 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -54,7 +54,8 @@ class BlockManagerMasterEndpoint(
   // Mapping from block id to the set of block managers that have the block.
   private val blockLocations = new JHashMap[BlockId, 
mutable.HashSet[BlockManagerId]]
 
-  private val askThreadPool = 
ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool")
+  private val askThreadPool =
+ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool", 100)
   private implicit val askExecutionContext = 
ExecutionContext.fromExecutorService(askThreadPool)
 
   private val topologyMapper = {

http://git-wip-us.apache.org/repos/asf/spark/blob/68ec4d64/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
index 742cf4f..67544b2 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
@@ -37,7 +37,7 @@ class BlockManagerSlaveEndpoint(
   extends ThreadSafeRpcEndpoint with Logging {
 
   private val asyncThreadPool =
-
ThreadUtils.newDaemonCachedThreadPool("block-manager-slave-async-thread-pool")
+
ThreadUtils.newDaemonCachedThreadPool("block-manager-slave-async-thread-pool", 
100)
   private implicit val asyncExecutionContext = 
ExecutionContext.fromExecutorService(asyncThreadPool)
 
   // Operations that involve removing blocks may be slow and should be done 
asynchronously


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25105][PYSPARK][SQL] Include PandasUDFType in the import all of pyspark.sql.functions

2018-08-22 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/master 71f38ac24 -> 2381953ab


[SPARK-25105][PYSPARK][SQL] Include PandasUDFType in the import all of 
pyspark.sql.functions

## What changes were proposed in this pull request?

Include PandasUDFType in the import all of pyspark.sql.functions

## How was this patch tested?

Run the test case from the pyspark shell from the jira 
[spark-25105](https://jira.apache.org/jira/browse/SPARK-25105?jql=project%20%3D%20SPARK%20AND%20component%20in%20(ML%2C%20PySpark%2C%20SQL%2C%20%22Structured%20Streaming%22))
I manually test on pyspark-shell:
before:
`
>>> from pyspark.sql.functions import *
>>> foo = pandas_udf(lambda x: x, 'v int', PandasUDFType.GROUPED_MAP)
Traceback (most recent call last):
  File "", line 1, in 
NameError: name 'PandasUDFType' is not defined
>>>
`
after:
`
>>> from pyspark.sql.functions import *
>>> foo = pandas_udf(lambda x: x, 'v int', PandasUDFType.GROUPED_MAP)
>>>
`
Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Closes #22100 from kevinyu98/spark-25105.

Authored-by: Kevin Yu 
Signed-off-by: Bryan Cutler 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2381953a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2381953a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2381953a

Branch: refs/heads/master
Commit: 2381953ab5d9e86d87a9ef118f28bc3f67d6d805
Parents: 71f38ac
Author: Kevin Yu 
Authored: Wed Aug 22 10:16:47 2018 -0700
Committer: Bryan Cutler 
Committed: Wed Aug 22 10:16:47 2018 -0700

--
 python/pyspark/sql/functions.py | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2381953a/python/pyspark/sql/functions.py
--
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index f583373..d58d8d1 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2931,6 +2931,7 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
 blacklist = ['map', 'since', 'ignore_unicode_prefix']
 __all__ = [k for k, v in globals().items()
if not k.startswith('_') and k[0].islower() and callable(v) and k 
not in blacklist]
+__all__ += ["PandasUDFType"]
 __all__.sort()
 
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23698][PYTHON] Resolve undefined names in Python 3

2018-08-22 Thread cutlerb
Repository: spark
Updated Branches:
  refs/heads/master e75488718 -> 71f38ac24


[SPARK-23698][PYTHON] Resolve undefined names in Python 3

## What changes were proposed in this pull request?

Fix issues arising from the fact that builtins __file__, __long__, 
__raw_input()__, __unicode__, __xrange()__, etc. were all removed from Python 
3.  __Undefined names__ have the potential to raise 
[NameError](https://docs.python.org/3/library/exceptions.html#NameError) at 
runtime.

## How was this patch tested?
* $ __python2 -m flake8 . --count --select=E9,F82 --show-source --statistics__
* $ __python3 -m flake8 . --count --select=E9,F82 --show-source --statistics__

holdenk

flake8 testing of https://github.com/apache/spark on Python 3.6.3

$ __python3 -m flake8 . --count --select=E901,E999,F821,F822,F823 --show-source 
--statistics__
```
./dev/merge_spark_pr.py:98:14: F821 undefined name 'raw_input'
result = raw_input("\n%s (y/n): " % prompt)
 ^
./dev/merge_spark_pr.py:136:22: F821 undefined name 'raw_input'
primary_author = raw_input(
 ^
./dev/merge_spark_pr.py:186:16: F821 undefined name 'raw_input'
pick_ref = raw_input("Enter a branch name [%s]: " % default_branch)
   ^
./dev/merge_spark_pr.py:233:15: F821 undefined name 'raw_input'
jira_id = raw_input("Enter a JIRA id [%s]: " % default_jira_id)
  ^
./dev/merge_spark_pr.py:278:20: F821 undefined name 'raw_input'
fix_versions = raw_input("Enter comma-separated fix version(s) [%s]: " % 
default_fix_versions)
   ^
./dev/merge_spark_pr.py:317:28: F821 undefined name 'raw_input'
raw_assignee = raw_input(
   ^
./dev/merge_spark_pr.py:430:14: F821 undefined name 'raw_input'
pr_num = raw_input("Which pull request would you like to merge? (e.g. 34): 
")
 ^
./dev/merge_spark_pr.py:442:18: F821 undefined name 'raw_input'
result = raw_input("Would you like to use the modified title? (y/n): ")
 ^
./dev/merge_spark_pr.py:493:11: F821 undefined name 'raw_input'
while raw_input("\n%s (y/n): " % pick_prompt).lower() == "y":
  ^
./dev/create-release/releaseutils.py:58:16: F821 undefined name 'raw_input'
response = raw_input("%s [y/n]: " % msg)
   ^
./dev/create-release/releaseutils.py:152:38: F821 undefined name 'unicode'
author = unidecode.unidecode(unicode(author, "UTF-8")).strip()
 ^
./python/setup.py:37:11: F821 undefined name '__version__'
VERSION = __version__
  ^
./python/pyspark/cloudpickle.py:275:18: F821 undefined name 'buffer'
dispatch[buffer] = save_buffer
 ^
./python/pyspark/cloudpickle.py:807:18: F821 undefined name 'file'
dispatch[file] = save_file
 ^
./python/pyspark/sql/conf.py:61:61: F821 undefined name 'unicode'
if not isinstance(obj, str) and not isinstance(obj, unicode):
^
./python/pyspark/sql/streaming.py:25:21: F821 undefined name 'long'
intlike = (int, long)
^
./python/pyspark/streaming/dstream.py:405:35: F821 undefined name 'long'
return self._sc._jvm.Time(long(timestamp * 1000))
  ^
./sql/hive/src/test/resources/data/scripts/dumpdata_script.py:21:10: F821 
undefined name 'xrange'
for i in xrange(50):
 ^
./sql/hive/src/test/resources/data/scripts/dumpdata_script.py:22:14: F821 
undefined name 'xrange'
for j in xrange(5):
 ^
./sql/hive/src/test/resources/data/scripts/dumpdata_script.py:23:18: F821 
undefined name 'xrange'
for k in xrange(20022):
 ^
20F821 undefined name 'raw_input'
20
```

Closes #20838 from cclauss/fix-undefined-names.

Authored-by: cclauss 
Signed-off-by: Bryan Cutler 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/71f38ac2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/71f38ac2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/71f38ac2

Branch: refs/heads/master
Commit: 71f38ac242157cbede684546159f2a27892ee09f
Parents: e754887
Author: cclauss 
Authored: Wed Aug 22 10:06:59 2018 -0700
Committer: Bryan Cutler 
Committed: Wed Aug 22 10:06:59 2018 -0700

--
 dev/create-release/releaseutils.py  |  8 +++--
 dev/merge_spark_pr.py   |  2 +-
 python/pyspark/sql/conf.py  |  5 ++-
 python/pyspark/sql/streaming.py |  5 +--
 python/pyspark/streaming/dstream.py |  2 ++
 python/pyspark/streaming/tests.py   | 34 +++-
 .../resources/data/scripts/dumpdata_script.py   |  3 ++
 7 files changed, 50 insertions(+), 9 deletions(-)
--



svn commit: r28891 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_22_00_02-55f3664-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-22 Thread pwendell
Author: pwendell
Date: Wed Aug 22 07:16:44 2018
New Revision: 28891

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_22_00_02-55f3664 docs


[This commit notification would consist of 1476 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[6/7] spark git commit: [SPARK-24882][SQL] improve data source v2 API

2018-08-22 Thread lixiao
http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
--
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index c7b74f3..946b636 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010
 import java.io._
 import java.nio.charset.StandardCharsets.UTF_8
 import java.nio.file.{Files, Paths}
-import java.util.{Locale, Optional, Properties}
+import java.util.{Locale, Properties}
 import java.util.concurrent.ConcurrentLinkedQueue
 import java.util.concurrent.atomic.AtomicInteger
 
@@ -44,11 +44,9 @@ import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
 import org.apache.spark.sql.functions.{count, window}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
 import org.apache.spark.sql.sources.v2.DataSourceOptions
-import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}
 import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
 import org.apache.spark.sql.streaming.util.StreamManualClock
 import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
-import org.apache.spark.sql.types.StructType
 
 abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with 
KafkaTest {
 
@@ -118,14 +116,16 @@ abstract class KafkaSourceTest extends StreamTest with 
SharedSQLContext with Kaf
 query.nonEmpty,
 "Cannot add data when there is no query for finding the active kafka 
source")
 
-  val sources = {
+  val sources: Seq[BaseStreamingSource] = {
 query.get.logicalPlan.collect {
   case StreamingExecutionRelation(source: KafkaSource, _) => source
-  case StreamingExecutionRelation(source: KafkaMicroBatchReader, _) => 
source
+  case StreamingExecutionRelation(source: KafkaMicroBatchReadSupport, 
_) => source
 } ++ (query.get.lastExecution match {
   case null => Seq()
   case e => e.logical.collect {
-case StreamingDataSourceV2Relation(_, _, _, reader: 
KafkaContinuousReader) => reader
+case r: StreamingDataSourceV2Relation
+if r.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
+  r.readSupport.asInstanceOf[KafkaContinuousReadSupport]
   }
 })
   }.distinct
@@ -650,7 +650,7 @@ class KafkaMicroBatchV2SourceSuite extends 
KafkaMicroBatchSourceSuiteBase {
   makeSureGetOffsetCalled,
   AssertOnQuery { query =>
 query.logicalPlan.collect {
-  case StreamingExecutionRelation(_: KafkaMicroBatchReader, _) => true
+  case StreamingExecutionRelation(_: KafkaMicroBatchReadSupport, _) => 
true
 }.nonEmpty
   }
 )
@@ -675,17 +675,16 @@ class KafkaMicroBatchV2SourceSuite extends 
KafkaMicroBatchSourceSuiteBase {
   "kafka.bootstrap.servers" -> testUtils.brokerAddress,
   "subscribe" -> topic
 ) ++ Option(minPartitions).map { p => "minPartitions" -> p}
-val reader = provider.createMicroBatchReader(
-  Optional.empty[StructType], dir.getAbsolutePath, new 
DataSourceOptions(options.asJava))
-reader.setOffsetRange(
-  Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 0L))),
-  Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 100L)))
-)
-val factories = reader.planInputPartitions().asScala
+val readSupport = provider.createMicroBatchReadSupport(
+  dir.getAbsolutePath, new DataSourceOptions(options.asJava))
+val config = readSupport.newScanConfigBuilder(
+  KafkaSourceOffset(Map(tp -> 0L)),
+  KafkaSourceOffset(Map(tp -> 100L))).build()
+val inputPartitions = readSupport.planInputPartitions(config)
   .map(_.asInstanceOf[KafkaMicroBatchInputPartition])
-withClue(s"minPartitions = $minPartitions generated factories 
$factories\n\t") {
-  assert(factories.size == numPartitionsGenerated)
-  factories.foreach { f => assert(f.reuseKafkaConsumer == 
reusesConsumers) }
+withClue(s"minPartitions = $minPartitions generated factories 
$inputPartitions\n\t") {
+  assert(inputPartitions.size == numPartitionsGenerated)
+  inputPartitions.foreach { f => assert(f.reuseKafkaConsumer == 
reusesConsumers) }
 }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java

[5/7] spark git commit: [SPARK-24882][SQL] improve data source v2 API

2018-08-22 Thread lixiao
http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousInputPartitionReader.java
--
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousInputPartitionReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousInputPartitionReader.java
deleted file mode 100644
index 7b0ba0b..000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousInputPartitionReader.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader.streaming;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
-
-/**
- * A variation on {@link InputPartitionReader} for use with streaming in 
continuous processing mode.
- */
-@InterfaceStability.Evolving
-public interface ContinuousInputPartitionReader extends 
InputPartitionReader {
-/**
- * Get the offset of the current record, or the start offset if no records 
have been read.
- *
- * The execution engine will call this method along with get() to keep 
track of the current
- * offset. When an epoch ends, the offset of the previous record in each 
partition will be saved
- * as a restart checkpoint.
- */
-PartitionOffset getOffset();
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReader.java
--
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReader.java
new file mode 100644
index 000..9101c8a
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReader.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.PartitionReader;
+
+/**
+ * A variation on {@link PartitionReader} for use with continuous streaming 
processing.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousPartitionReader extends PartitionReader {
+
+  /**
+   * Get the offset of the current record, or the start offset if no records 
have been read.
+   *
+   * The execution engine will call this method along with get() to keep track 
of the current
+   * offset. When an epoch ends, the offset of the previous record in each 
partition will be saved
+   * as a restart checkpoint.
+   */
+  PartitionOffset getOffset();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReaderFactory.java
--
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReaderFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReaderFactory.java
new 

[3/7] spark git commit: [SPARK-24882][SQL] improve data source v2 API

2018-08-22 Thread lixiao
http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriteSupportProvider.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriteSupportProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriteSupportProvider.scala
new file mode 100644
index 000..4218fd5
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriteSupportProvider.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import org.apache.spark.sql.{ForeachWriter, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.python.PythonForeachWriter
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
StreamingWriteSupportProvider}
+import org.apache.spark.sql.sources.v2.writer.{DataWriter, WriterCommitMessage}
+import 
org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, 
StreamingWriteSupport}
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A [[org.apache.spark.sql.sources.v2.DataSourceV2]] for forwarding data into 
the specified
+ * [[ForeachWriter]].
+ *
+ * @param writer The [[ForeachWriter]] to process all data.
+ * @param converter An object to convert internal rows to target type T. 
Either it can be
+ *  a [[ExpressionEncoder]] or a direct converter function.
+ * @tparam T The expected type of the sink.
+ */
+case class ForeachWriteSupportProvider[T](
+writer: ForeachWriter[T],
+converter: Either[ExpressionEncoder[T], InternalRow => T])
+  extends StreamingWriteSupportProvider {
+
+  override def createStreamingWriteSupport(
+  queryId: String,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceOptions): StreamingWriteSupport = {
+new StreamingWriteSupport {
+  override def commit(epochId: Long, messages: 
Array[WriterCommitMessage]): Unit = {}
+  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
+
+  override def createStreamingWriterFactory(): StreamingDataWriterFactory 
= {
+val rowConverter: InternalRow => T = converter match {
+  case Left(enc) =>
+val boundEnc = enc.resolveAndBind(
+  schema.toAttributes,
+  SparkSession.getActiveSession.get.sessionState.analyzer)
+boundEnc.fromRow
+  case Right(func) =>
+func
+}
+ForeachWriterFactory(writer, rowConverter)
+  }
+
+  override def toString: String = "ForeachSink"
+}
+  }
+}
+
+object ForeachWriteSupportProvider {
+  def apply[T](
+  writer: ForeachWriter[T],
+  encoder: ExpressionEncoder[T]): ForeachWriteSupportProvider[_] = {
+writer match {
+  case pythonWriter: PythonForeachWriter =>
+new ForeachWriteSupportProvider[UnsafeRow](
+  pythonWriter, Right((x: InternalRow) => x.asInstanceOf[UnsafeRow]))
+  case _ =>
+new ForeachWriteSupportProvider[T](writer, Left(encoder))
+}
+  }
+}
+
+case class ForeachWriterFactory[T](
+writer: ForeachWriter[T],
+rowConverter: InternalRow => T)
+  extends StreamingDataWriterFactory {
+  override def createWriter(
+  partitionId: Int,
+  taskId: Long,
+  epochId: Long): ForeachDataWriter[T] = {
+new ForeachDataWriter(writer, rowConverter, partitionId, epochId)
+  }
+}
+
+/**
+ * A [[DataWriter]] which writes data in this partition to a [[ForeachWriter]].
+ *
+ * @param writer The [[ForeachWriter]] to process all data.
+ * @param rowConverter A function which can convert [[InternalRow]] to the 
required type [[T]]
+ * @param partitionId
+ * @param epochId
+ * @tparam T The type expected by the writer.
+ */
+class ForeachDataWriter[T](
+writer: ForeachWriter[T],
+

[1/7] spark git commit: [SPARK-24882][SQL] improve data source v2 API

2018-08-22 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 55f36641f -> e75488718


http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index 4980b0c..3d21bc6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -41,7 +41,7 @@ class ContinuousSuiteBase extends StreamTest {
   case s: ContinuousExecution =>
 assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure 
query is initialized")
 val reader = s.lastExecution.executedPlan.collectFirst {
-  case DataSourceV2ScanExec(_, _, _, _, r: RateStreamContinuousReader) 
=> r
+  case DataSourceV2ScanExec(_, _, _, _, r: 
RateStreamContinuousReadSupport, _) => r
 }.get
 
 val deltaMs = numTriggers * 1000 + 300

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
index 82836dc..3c973d8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
@@ -27,9 +27,9 @@ import org.apache.spark._
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.sql.LocalSparkSession
 import org.apache.spark.sql.execution.streaming.continuous._
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, 
PartitionOffset}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, 
PartitionOffset}
 import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
 import org.apache.spark.sql.test.TestSparkSession
 
 class EpochCoordinatorSuite
@@ -40,20 +40,20 @@ class EpochCoordinatorSuite
 
   private var epochCoordinator: RpcEndpointRef = _
 
-  private var writer: StreamWriter = _
+  private var writeSupport: StreamingWriteSupport = _
   private var query: ContinuousExecution = _
   private var orderVerifier: InOrder = _
 
   override def beforeEach(): Unit = {
-val reader = mock[ContinuousReader]
-writer = mock[StreamWriter]
+val reader = mock[ContinuousReadSupport]
+writeSupport = mock[StreamingWriteSupport]
 query = mock[ContinuousExecution]
-orderVerifier = inOrder(writer, query)
+orderVerifier = inOrder(writeSupport, query)
 
 spark = new TestSparkSession()
 
 epochCoordinator
-  = EpochCoordinatorRef.create(writer, reader, query, "test", 1, spark, 
SparkEnv.get)
+  = EpochCoordinatorRef.create(writeSupport, reader, query, "test", 1, 
spark, SparkEnv.get)
   }
 
   test("single epoch") {
@@ -209,12 +209,12 @@ class EpochCoordinatorSuite
   }
 
   private def verifyCommit(epoch: Long): Unit = {
-orderVerifier.verify(writer).commit(eqTo(epoch), any())
+orderVerifier.verify(writeSupport).commit(eqTo(epoch), any())
 orderVerifier.verify(query).commit(epoch)
   }
 
   private def verifyNoCommitFor(epoch: Long): Unit = {
-verify(writer, never()).commit(eqTo(epoch), any())
+verify(writeSupport, never()).commit(eqTo(epoch), any())
 verify(query, never()).commit(epoch)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index 52b833a..aeef4c8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -17,73 +17,74 @@
 
 package org.apache.spark.sql.streaming.sources
 
-import java.util.Optional
-
 import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.DataSource
 import 

[4/7] spark git commit: [SPARK-24882][SQL] improve data source v2 API

2018-08-22 Thread lixiao
http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
index 5267f5f..e9cc399 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
@@ -21,6 +21,7 @@ import java.util.regex.Pattern
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2.{DataSourceV2, SessionConfigSupport}
 
 private[sql] object DataSourceV2Utils extends Logging {
@@ -55,4 +56,12 @@ private[sql] object DataSourceV2Utils extends Logging {
 
 case _ => Map.empty
   }
+
+  def failForUserSpecifiedSchema[T](ds: DataSourceV2): T = {
+val name = ds match {
+  case register: DataSourceRegister => register.shortName()
+  case _ => ds.getClass.getName
+}
+throw new UnsupportedOperationException(name + " source does not support 
user-specified schema")
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 59ebb9b..c3f7b69 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -23,15 +23,11 @@ import org.apache.spark.{SparkEnv, SparkException, 
TaskContext}
 import org.apache.spark.executor.CommitDeniedException
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.streaming.MicroBatchExecution
 import org.apache.spark.sql.sources.v2.writer._
-import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
 /**
@@ -39,7 +35,8 @@ import org.apache.spark.util.Utils
  * specific logical plans, like 
[[org.apache.spark.sql.catalyst.plans.logical.AppendData]].
  */
 @deprecated("Use specific logical plans like AppendData instead", "2.4.0")
-case class WriteToDataSourceV2(writer: DataSourceWriter, query: LogicalPlan) 
extends LogicalPlan {
+case class WriteToDataSourceV2(writeSupport: BatchWriteSupport, query: 
LogicalPlan)
+  extends LogicalPlan {
   override def children: Seq[LogicalPlan] = Seq(query)
   override def output: Seq[Attribute] = Nil
 }
@@ -47,46 +44,48 @@ case class WriteToDataSourceV2(writer: DataSourceWriter, 
query: LogicalPlan) ext
 /**
  * The physical plan for writing data into data source v2.
  */
-case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) 
extends SparkPlan {
+case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: 
SparkPlan)
+  extends SparkPlan {
+
   override def children: Seq[SparkPlan] = Seq(query)
   override def output: Seq[Attribute] = Nil
 
   override protected def doExecute(): RDD[InternalRow] = {
-val writeTask = writer.createWriterFactory()
-val useCommitCoordinator = writer.useCommitCoordinator
+val writerFactory = writeSupport.createBatchWriterFactory()
+val useCommitCoordinator = writeSupport.useCommitCoordinator
 val rdd = query.execute()
 val messages = new Array[WriterCommitMessage](rdd.partitions.length)
 
-logInfo(s"Start processing data source writer: $writer. " +
+logInfo(s"Start processing data source write support: $writeSupport. " +
   s"The input RDD has ${messages.length} partitions.")
 
 try {
   sparkContext.runJob(
 rdd,
 (context: TaskContext, iter: Iterator[InternalRow]) =>
-  DataWritingSparkTask.run(writeTask, context, iter, 
useCommitCoordinator),
+  DataWritingSparkTask.run(writerFactory, context, iter, 
useCommitCoordinator),
 rdd.partitions.indices,
 (index, message: WriterCommitMessage) => {
   messages(index) = message
-  writer.onDataWriterCommit(message)
+  

[2/7] spark git commit: [SPARK-24882][SQL] improve data source v2 API

2018-08-22 Thread lixiao
http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
--
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
index 274dc37..2cdbba8 100644
--- 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
@@ -17,72 +17,26 @@
 
 package test.org.apache.spark.sql.sources.v2;
 
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.sources.v2.BatchReadSupportProvider;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.ReadSupport;
-import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
-import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.types.StructType;
-
-public class JavaSimpleDataSourceV2 implements DataSourceV2, ReadSupport {
-
-  class Reader implements DataSourceReader {
-private final StructType schema = new StructType().add("i", 
"int").add("j", "int");
-
-@Override
-public StructType readSchema() {
-  return schema;
-}
-
-@Override
-public List> planInputPartitions() {
-  return java.util.Arrays.asList(
-new JavaSimpleInputPartition(0, 5),
-new JavaSimpleInputPartition(5, 10));
-}
-  }
-
-  static class JavaSimpleInputPartition implements InputPartition,
-InputPartitionReader {
+import org.apache.spark.sql.sources.v2.reader.*;
 
-private int start;
-private int end;
+public class JavaSimpleDataSourceV2 implements DataSourceV2, 
BatchReadSupportProvider {
 
-JavaSimpleInputPartition(int start, int end) {
-  this.start = start;
-  this.end = end;
-}
-
-@Override
-public InputPartitionReader createPartitionReader() {
-  return new JavaSimpleInputPartition(start - 1, end);
-}
+  class ReadSupport extends JavaSimpleReadSupport {
 
 @Override
-public boolean next() {
-  start += 1;
-  return start < end;
-}
-
-@Override
-public InternalRow get() {
-  return new GenericInternalRow(new Object[] {start, -start});
-}
-
-@Override
-public void close() throws IOException {
-
+public InputPartition[] planInputPartitions(ScanConfig config) {
+  InputPartition[] partitions = new InputPartition[2];
+  partitions[0] = new JavaRangeInputPartition(0, 5);
+  partitions[1] = new JavaRangeInputPartition(5, 10);
+  return partitions;
 }
   }
 
   @Override
-  public DataSourceReader createReader(DataSourceOptions options) {
-return new Reader();
+  public BatchReadSupport createBatchReadSupport(DataSourceOptions options) {
+return new ReadSupport();
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReadSupport.java
--
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReadSupport.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReadSupport.java
new file mode 100644
index 000..685f9b9
--- /dev/null
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReadSupport.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql.sources.v2;
+
+import java.io.IOException;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.sources.v2.reader.*;
+import org.apache.spark.sql.types.StructType;
+
+abstract class JavaSimpleReadSupport implements 

[7/7] spark git commit: [SPARK-24882][SQL] improve data source v2 API

2018-08-22 Thread lixiao
[SPARK-24882][SQL] improve data source v2 API

## What changes were proposed in this pull request?

Improve the data source v2 API according to the [design 
doc](https://docs.google.com/document/d/1DDXCTCrup4bKWByTalkXWgavcPdvur8a4eEu8x1BzPM/edit?usp=sharing)

summary of the changes
1. rename `ReadSupport` -> `DataSourceReader` -> `InputPartition` -> 
`InputPartitionReader` to `BatchReadSupportProvider` -> `BatchReadSupport` -> 
`InputPartition`/`PartitionReaderFactory` -> `PartitionReader`. Similar 
renaming also happens at streaming and write APIs.
2. create `ScanConfig` to store query specific information like operator 
pushdown result, streaming offsets, etc. This makes batch and streaming 
`ReadSupport`(previouslly named `DataSourceReader`) immutable. All other 
methods take `ScanConfig` as input, which implies applying operator pushdown 
and getting streaming offsets happen before all other things(get input 
partitions, report statistics, etc.).
3. separate `InputPartition` to `InputPartition` and `PartitionReaderFactory`. 
This is a natural separation, data splitting and reading are orthogonal and we 
should not mix them in one interfaces. This also makes the naming consistent 
between read and write API: `PartitionReaderFactory` vs `DataWriterFactory`.
4. separate the batch and streaming interfaces. Sometimes it's painful to force 
the streaming interface to extend batch interface, as we may need to override 
some batch methods to return false, or even leak the streaming concept to batch 
API(e.g. `DataWriterFactory#createWriter(partitionId, taskId, epochId)`)

Some follow-ups we should do after this PR (tracked by 
https://issues.apache.org/jira/browse/SPARK-25186 ):
1. Revisit the life cycle of `ReadSupport` instances. Currently I keep it same 
as the previous `DataSourceReader`, i.e. the life cycle is bound to the 
batch/stream query. This fits streaming very well but may not be perfect for 
batch source. We can also consider to let `ReadSupport.newScanConfigBuilder` 
take `DataSourceOptions` as parameter, if we decide to change the life cycle.
2. Add `WriteConfig`. This is similar to `ScanConfig` and makes the write API 
more flexible. But it's only needed when we add the `replaceWhere` support, and 
it needs to change the streaming execution engine for this new concept, which I 
think is better to be done in another PR.
3. Refine the document. This PR adds/changes a lot of document and it's very 
likely that some people may have better ideas.
4. Figure out the life cycle of `CustomMetrics`. It looks to me that it should 
be bound to a `ScanConfig`, but we need to change `ProgressReporter` to get the 
`ScanConfig`. Better to be done in another PR.
5. Better operator pushdown API. This PR keeps the pushdown API as it was, i.e. 
using the `SupportsPushdownXYZ` traits. We can design a better API using build 
pattern, but this is a complicated design and deserves an individual JIRA 
ticket and design doc.
6. Improve the continuous streaming engine to only create a new `ScanConfig` 
when re-configuring.
7. Remove `SupportsPushdownCatalystFilter`. This is actually not a must-have 
for file source, we can change the hive partition pruning to use the public 
`Filter`.

## How was this patch tested?

existing tests.

Closes #22009 from cloud-fan/redesign.

Authored-by: Wenchen Fan 
Signed-off-by: Xiao Li 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e7548871
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e7548871
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e7548871

Branch: refs/heads/master
Commit: e754887182304ad0d622754e33192ebcdd515965
Parents: 55f3664
Author: Wenchen Fan 
Authored: Wed Aug 22 00:10:55 2018 -0700
Committer: Xiao Li 
Committed: Wed Aug 22 00:10:55 2018 -0700

--
 .../kafka010/KafkaContinuousReadSupport.scala   | 255 +++
 .../sql/kafka010/KafkaContinuousReader.scala| 248 ---
 .../kafka010/KafkaMicroBatchReadSupport.scala   | 401 +
 .../sql/kafka010/KafkaMicroBatchReader.scala| 402 -
 .../sql/kafka010/KafkaSourceProvider.scala  |  37 +-
 .../spark/sql/kafka010/KafkaStreamWriter.scala  | 118 -
 .../kafka010/KafkaStreamingWriteSupport.scala   | 118 +
 .../kafka010/KafkaContinuousSourceSuite.scala   |   8 +-
 .../sql/kafka010/KafkaContinuousTest.scala  |   8 +-
 .../kafka010/KafkaMicroBatchSourceSuite.scala   |  33 +-
 .../sources/v2/BatchReadSupportProvider.java|  61 +++
 .../sources/v2/BatchWriteSupportProvider.java   |  59 +++
 .../sql/sources/v2/ContinuousReadSupport.java   |  46 --
 .../v2/ContinuousReadSupportProvider.java   |  70 +++
 .../spark/sql/sources/v2/DataSourceV2.java  |  10 +-
 .../sql/sources/v2/MicroBatchReadSupport.java   |  52 ---
 .../v2/MicroBatchReadSupportProvider.java   |  70 +++
 

spark git commit: [SPARK-25093][SQL] Avoid recompiling regexp for comments multiple times

2018-08-22 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 4a9c9d8f9 -> 55f36641f


[SPARK-25093][SQL] Avoid recompiling regexp for comments multiple times

## What changes were proposed in this pull request?

The PR moves the compilation of the regexp for code formatting outside the 
method which is called for each code block when splitting expressions, in order 
to avoid recompiling the regexp every time.

Credit should be given to Izek Greenfield.

## How was this patch tested?

existing UTs

Closes #22135 from mgaido91/SPARK-25093.

Authored-by: Marco Gaido 
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/55f36641
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/55f36641
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/55f36641

Branch: refs/heads/master
Commit: 55f36641ff20114b892795f100da7efb79b0cc32
Parents: 4a9c9d8
Author: Marco Gaido 
Authored: Wed Aug 22 14:31:51 2018 +0800
Committer: Wenchen Fan 
Committed: Wed Aug 22 14:31:51 2018 +0800

--
 .../scala/org/apache/spark/deploy/worker/Worker.scala|  4 ++--
 core/src/main/scala/org/apache/spark/util/Utils.scala| 11 ++-
 .../apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala |  6 +++---
 .../spark/sql/catalyst/catalog/SessionCatalog.scala  |  3 ++-
 .../sql/catalyst/expressions/codegen/CodeFormatter.scala | 10 +-
 .../main/scala/org/apache/spark/sql/types/DataType.scala |  3 ++-
 .../org/apache/spark/streaming/dstream/DStream.scala | 10 +-
 7 files changed, 25 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/55f36641/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index ee1ca0b..cbd812a 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -758,6 +758,7 @@ private[deploy] class Worker(
 private[deploy] object Worker extends Logging {
   val SYSTEM_NAME = "sparkWorker"
   val ENDPOINT_NAME = "Worker"
+  private val SSL_NODE_LOCAL_CONFIG_PATTERN = 
"""\-Dspark\.ssl\.useNodeLocalConf\=(.+)""".r
 
   def main(argStrings: Array[String]) {
 Thread.setDefaultUncaughtExceptionHandler(new 
SparkUncaughtExceptionHandler(
@@ -803,9 +804,8 @@ private[deploy] object Worker extends Logging {
   }
 
   def isUseLocalNodeSSLConfig(cmd: Command): Boolean = {
-val pattern = """\-Dspark\.ssl\.useNodeLocalConf\=(.+)""".r
 val result = cmd.javaOpts.collectFirst {
-  case pattern(_result) => _result.toBoolean
+  case SSL_NODE_LOCAL_CONFIG_PATTERN(_result) => _result.toBoolean
 }
 result.getOrElse(false)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/55f36641/core/src/main/scala/org/apache/spark/util/Utils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 7ec707d..e6646bd 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1409,13 +1409,14 @@ private[spark] object Utils extends Logging {
 }
   }
 
+  // A regular expression to match classes of the internal Spark API's
+  // that we want to skip when finding the call site of a method.
+  private val SPARK_CORE_CLASS_REGEX =
+
"""^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?(\.broadcast)?\.[A-Z]""".r
+  private val SPARK_SQL_CLASS_REGEX = """^org\.apache\.spark\.sql.*""".r
+
   /** Default filtering function for finding call sites using `getCallSite`. */
   private def sparkInternalExclusionFunction(className: String): Boolean = {
-// A regular expression to match classes of the internal Spark API's
-// that we want to skip when finding the call site of a method.
-val SPARK_CORE_CLASS_REGEX =
-  
"""^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?(\.broadcast)?\.[A-Z]""".r
-val SPARK_SQL_CLASS_REGEX = """^org\.apache\.spark\.sql.*""".r
 val SCALA_CORE_CLASS_PREFIX = "scala"
 val isSparkClass = SPARK_CORE_CLASS_REGEX.findFirstIn(className).isDefined 
||
   SPARK_SQL_CLASS_REGEX.findFirstIn(className).isDefined

http://git-wip-us.apache.org/repos/asf/spark/blob/55f36641/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala