[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223744025
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
@@ -505,20 +505,14 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
 val actualSchema =
   StructType(schema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord))
 
-val linesWithoutHeader = if (parsedOptions.headerFlag && 
maybeFirstLine.isDefined) {
-  val firstLine = maybeFirstLine.get
-  val parser = new CsvParser(parsedOptions.asParserSettings)
-  val columnNames = parser.parseLine(firstLine)
-  CSVDataSource.checkHeaderColumnNames(
+val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine =>
+  val headerChecker = new CSVHeaderChecker(
 actualSchema,
-columnNames,
-csvDataset.getClass.getCanonicalName,
-parsedOptions.enforceSchema,
-sparkSession.sessionState.conf.caseSensitiveAnalysis)
+parsedOptions,
+source = s"CSV source: ${csvDataset.getClass.getCanonicalName}")
--- End diff --

Makes sense. If that's just `toString`, of course I can fix it here since 
the change is small although it's orthogonal.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223743548
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
 ---
@@ -251,7 +125,7 @@ object TextInputCSVDataSource extends CSVDataSource {
 maybeFirstLine.map(csvParser.parseLine(_)) match {
   case Some(firstRow) if firstRow != null =>
 val caseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
-val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
+val header = CSVUtils.makeSafeHeader(firstRow, caseSensitive, 
parsedOptions)
--- End diff --

Because mostly in this codes use `CSVUtils...` one. I just followed it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223728765
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.csv
+
+import com.univocity.parsers.csv.CsvParser
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Checks that column names in a CSV header and field names in the schema 
are the same
+ * by taking into account case sensitivity.
+ *
+ * @param schema provided (or inferred) schema to which CSV must conform.
+ * @param options parsed CSV options.
+ * @param source name of CSV source that are currently checked. It is used 
in error messages.
+ * @param isStartOfFile indicates if the currently processing partition is 
the start of the file.
+ *  if unknown or not applicable (for instance when 
the input is a dataset),
+ *  can be omitted.
+ */
+class CSVHeaderChecker(
--- End diff --

It's under execution package which is meant to be private. Since it's 
accessed in DataFrameReader, it should be `private[sql]` which is removed in 
SPARK-16964 for this reason.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22675: [SPARK-25347][ML][DOC] Spark datasource for image...

2018-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22675#discussion_r223727369
  
--- Diff: docs/ml-datasource.md ---
@@ -0,0 +1,51 @@
+---
+layout: global
+title: Data sources
+displayTitle: Data sources
+---
+
+In this section, we introduce how to use data source in ML to load data.
+Beside some general data sources "parquat", "csv", "json", "jdbc", we also 
provide some specific data source for ML.
+
+**Table of Contents**
+
+* This will become a table of contents (this text will be scraped).
+{:toc}
+
+## Image data source
+
+This image data source is used to load libsvm data files from directory.
+
+
+

+[`ImageDataSource`](api/scala/index.html#org.apache.spark.ml.source.image.ImageDataSource)
+implements Spark SQL data source API for loading image data as DataFrame.
+The loaded DataFrame has one StructType column: "image". containing image 
data stored as image schema.
+
+{% highlight scala %}
+scala> spark.read.format("image").load("data/mllib/images/origin")
+res1: org.apache.spark.sql.DataFrame = [image: struct]
+{% endhighlight %}
+
+
+

+[`ImageDataSource`](api/java/org/apache/spark/ml/source/image/ImageDataSource.html)
--- End diff --

cc @cloud-fan and @gatorsmile, am I missing something?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22675: [SPARK-25347][ML][DOC] Spark datasource for image...

2018-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22675#discussion_r223725196
  
--- Diff: docs/ml-datasource.md ---
@@ -0,0 +1,51 @@
+---
+layout: global
+title: Data sources
+displayTitle: Data sources
+---
+
+In this section, we introduce how to use data source in ML to load data.
+Beside some general data sources "parquat", "csv", "json", "jdbc", we also 
provide some specific data source for ML.
+
+**Table of Contents**
+
+* This will become a table of contents (this text will be scraped).
+{:toc}
+
+## Image data source
+
+This image data source is used to load libsvm data files from directory.
+
+
+

+[`ImageDataSource`](api/scala/index.html#org.apache.spark.ml.source.image.ImageDataSource)
+implements Spark SQL data source API for loading image data as DataFrame.
+The loaded DataFrame has one StructType column: "image". containing image 
data stored as image schema.
+
+{% highlight scala %}
+scala> spark.read.format("image").load("data/mllib/images/origin")
+res1: org.apache.spark.sql.DataFrame = [image: struct]
+{% endhighlight %}
+
+
+

+[`ImageDataSource`](api/java/org/apache/spark/ml/source/image/ImageDataSource.html)
--- End diff --

I meant (external) Avro was merged into `external/...` in Apache Spark as a 
separate module due to the reason above. Image data source is merged into 
Spark's main code rather then a separate module. I don't object to bring an 
external into Apache Spark and I don't doubt you guys's judgement; however, was 
wondering why this exists in Spark's main code whereas the ideal approach is to 
put them `external/...`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r223696096
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala ---
@@ -547,4 +548,27 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSQLContext {
   Map("pretty" -> "true"))),
   Seq(Row(expected)))
   }
+
+  test("from_json invalid json - check modes") {
+val df = Seq("""{"a" 1}""", """{"a": 2}""").toDS()
+val schema = new StructType().add("a", IntegerType)
+
+checkAnswer(
+  df.select(from_json($"value", schema, Map("mode" -> "PERMISSIVE"))),
--- End diff --

@MaxGekk, can we add a test with manual malformed column in the schema? For 
instance, 


https://github.com/apache/spark/blob/a8a1ac01c4732f8a738b973c8486514cd88bf99b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala#L1125-L1133


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22656: [SPARK-25669][SQL] Check CSV header only when it exists

2018-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22656
  
Merged to master and branch-2.4.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223594430
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.csv
+
+import com.univocity.parsers.csv.CsvParser
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Checks that column names in a CSV header and field names in the schema 
are the same
+ * by taking into account case sensitivity.
+ *
+ * @param schema provided (or inferred) schema to which CSV must conform.
+ * @param options parsed CSV options.
+ * @param source name of CSV source that are currently checked. It is used 
in error messages.
+ * @param isStartOfFile indicates if the currently processing partition is 
the start of the file.
+ *  if unknown or not applicable (for instance when 
the input is a dataset),
+ *  can be omitted.
+ */
+class CSVHeaderChecker(
+schema: StructType,
+options: CSVOptions,
+source: String,
+isStartOfFile: Boolean = false) extends Logging {
+
+  // Indicates if it is set to `false`, comparison of column names and 
schema field
+  // names is not case sensitive.
+  private val caseSensitive = SQLConf.get.caseSensitiveAnalysis
+
+  // Indicates if it is `true`, column names are ignored otherwise the CSV 
column
+  // names are checked for conformance to the schema. In the case if
+  // the column name don't conform to the schema, an exception is thrown.
+  private val enforceSchema = options.enforceSchema
+
+  /**
+   * Checks that column names in a CSV header and field names in the 
schema are the same
+   * by taking into account case sensitivity.
+   *
+   * @param columnNames names of CSV columns that must be checked against 
to the schema.
+   */
+  private def checkHeaderColumnNames(columnNames: Array[String]): Unit = {
+if (columnNames != null) {
+  val fieldNames = schema.map(_.name).toIndexedSeq
+  val (headerLen, schemaSize) = (columnNames.size, fieldNames.length)
+  var errorMessage: Option[String] = None
+
+  if (headerLen == schemaSize) {
+var i = 0
+while (errorMessage.isEmpty && i < headerLen) {
+  var (nameInSchema, nameInHeader) = (fieldNames(i), 
columnNames(i))
+  if (!caseSensitive) {
+// scalastyle:off caselocale
+nameInSchema = nameInSchema.toLowerCase
+nameInHeader = nameInHeader.toLowerCase
+// scalastyle:on caselocale
+  }
+  if (nameInHeader != nameInSchema) {
+errorMessage = Some(
+  s"""|CSV header does not conform to the schema.
+  | Header: ${columnNames.mkString(", ")}
+  | Schema: ${fieldNames.mkString(", ")}
+  |Expected: ${fieldNames(i)} but found: ${columnNames(i)}
+  |$source""".stripMargin)
--- End diff --

only this diff. 

Previously it was 

```
 |CSV file: $fileName""".stripMargin)
```

which ends up with producing the class of source here. See 
(https://github.com/apache/spark/pull/22676/files#diff-f70bda59304588cc3abfa3a9840653f4R512)

This is only the diff in this method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223594011
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.csv
+
+import com.univocity.parsers.csv.CsvParser
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Checks that column names in a CSV header and field names in the schema 
are the same
+ * by taking into account case sensitivity.
+ *
+ * @param schema provided (or inferred) schema to which CSV must conform.
+ * @param options parsed CSV options.
+ * @param source name of CSV source that are currently checked. It is used 
in error messages.
+ * @param isStartOfFile indicates if the currently processing partition is 
the start of the file.
+ *  if unknown or not applicable (for instance when 
the input is a dataset),
+ *  can be omitted.
+ */
+class CSVHeaderChecker(
+schema: StructType,
+options: CSVOptions,
+source: String,
+isStartOfFile: Boolean = false) extends Logging {
+
+  // Indicates if it is set to `false`, comparison of column names and 
schema field
+  // names is not case sensitive.
+  private val caseSensitive = SQLConf.get.caseSensitiveAnalysis
+
+  // Indicates if it is `true`, column names are ignored otherwise the CSV 
column
+  // names are checked for conformance to the schema. In the case if
+  // the column name don't conform to the schema, an exception is thrown.
+  private val enforceSchema = options.enforceSchema
+
+  /**
+   * Checks that column names in a CSV header and field names in the 
schema are the same
+   * by taking into account case sensitivity.
+   *
+   * @param columnNames names of CSV columns that must be checked against 
to the schema.
+   */
+  private def checkHeaderColumnNames(columnNames: Array[String]): Unit = {
--- End diff --

It's moved as was except the parameters at its signature.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223593838
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 ---
@@ -273,44 +274,47 @@ private[csv] object UnivocityParser {
   inputStream: InputStream,
   shouldDropHeader: Boolean,
   tokenizer: CsvParser): Iterator[Array[String]] = {
-convertStream(inputStream, shouldDropHeader, tokenizer)(tokens => 
tokens)
+val handleHeader: () => Unit =
+  () => if (shouldDropHeader) tokenizer.parseNext
+
+convertStream(inputStream, tokenizer, handleHeader)(tokens => tokens)
   }
 
   /**
* Parses a stream that contains CSV strings and turns it into an 
iterator of rows.
*/
   def parseStream(
   inputStream: InputStream,
-  shouldDropHeader: Boolean,
   parser: UnivocityParser,
-  schema: StructType,
-  checkHeader: Array[String] => Unit): Iterator[InternalRow] = {
+  headerChecker: CSVHeaderChecker,
+  schema: StructType): Iterator[InternalRow] = {
 val tokenizer = parser.tokenizer
 val safeParser = new FailureSafeParser[Array[String]](
   input => Seq(parser.convert(input)),
   parser.options.parseMode,
   schema,
   parser.options.columnNameOfCorruptRecord,
   parser.options.multiLine)
-convertStream(inputStream, shouldDropHeader, tokenizer, checkHeader) { 
tokens =>
+
+val handleHeader: () => Unit =
+  () => headerChecker.checkHeaderColumnNames(tokenizer)
--- End diff --

This matches the code structure with `parseStream` and `parseIterator` 
which are used in multimode and non-multimode.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223593894
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
 ---
@@ -90,6 +89,49 @@ object CSVUtils {
   None
 }
   }
+
+  /**
+   * Generates a header from the given row which is null-safe and 
duplicate-safe.
+   */
+  def makeSafeHeader(
--- End diff --

It's moved as was.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223593701
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 ---
@@ -273,44 +274,47 @@ private[csv] object UnivocityParser {
   inputStream: InputStream,
   shouldDropHeader: Boolean,
   tokenizer: CsvParser): Iterator[Array[String]] = {
-convertStream(inputStream, shouldDropHeader, tokenizer)(tokens => 
tokens)
+val handleHeader: () => Unit =
+  () => if (shouldDropHeader) tokenizer.parseNext
--- End diff --

This is used in schema inference path, where we don't check header. Here 
only it drops  the header.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...

2018-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22676
  
cc @cloud-fan and @MaxGekk 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread HyukjinKwon
GitHub user HyukjinKwon opened a pull request:

https://github.com/apache/spark/pull/22676

[SPARK-25684][SQL] Organize header related codes in CSV datasource

## What changes were proposed in this pull request?

1. Move `CSVDataSource.makeSafeHeader` to `CSVUtils.makeSafeHeader` (as is).
  Rationale:
- Historically and at the first place of refactoring (which I did), I 
intended to put all CSV specific handling (like options), filtering, extracting 
header, etc.
- See `JsonDataSource`. Now `CSVDataSource` is quite consistent with 
`JsonDataSource`. Since CSV's code path is quite complicated, we might better 
match them as possible as we can.

2. Move `CSVDataSource.checkHeaderColumnNames` to 
`CSVHeaderChecker.checkHeaderColumnNames` (as is).
Rationale:
  - Similar reasons above with 1.

3. Put `enforceSchema` logics into `CSVHeaderChecker`.
  - The checking header and column pruning stuff were added (per 
https://github.com/apache/spark/pull/20894 and 
https://github.com/apache/spark/pull/21296) but some of codes such as 
https://github.com/apache/spark/pull/21296 are duplicated
  - Also, checking header code is basically here and there. We better 
put them in a single place, which is quite error-prone. See 
(https://github.com/apache/spark/pull/22656).

## How was this patch tested?

Existing tests should cover this.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HyukjinKwon/spark refactoring-csv

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22676.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22676


commit 56906680ab7d5d63be04bac2c3a19bb52baa3025
Author: hyukjinkwon 
Date:   2018-10-09T07:26:08Z

Organize header related codes in CSV datasource




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22656: [SPARK-25669][SQL] Check CSV header only when it ...

2018-10-08 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22656#discussion_r223566522
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
@@ -505,7 +505,8 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
 val actualSchema =
   StructType(schema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord))
 
-val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine =>
+val linesWithoutHeader = if (parsedOptions.headerFlag && 
maybeFirstLine.isDefined) {
--- End diff --

LGTM but it really needs some refactoring. Let me give a shot


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22675: [SPARK-25347][ML][DOC] Spark datasource for image...

2018-10-08 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22675#discussion_r223552437
  
--- Diff: docs/ml-datasource.md ---
@@ -0,0 +1,51 @@
+---
+layout: global
+title: Data sources
+displayTitle: Data sources
+---
+
+In this section, we introduce how to use data source in ML to load data.
+Beside some general data sources "parquat", "csv", "json", "jdbc", we also 
provide some specific data source for ML.
+
+**Table of Contents**
+
+* This will become a table of contents (this text will be scraped).
+{:toc}
+
+## Image data source
+
+This image data source is used to load libsvm data files from directory.
+
+
+

+[`ImageDataSource`](api/scala/index.html#org.apache.spark.ml.source.image.ImageDataSource)
+implements Spark SQL data source API for loading image data as DataFrame.
+The loaded DataFrame has one StructType column: "image". containing image 
data stored as image schema.
+
+{% highlight scala %}
+scala> spark.read.format("image").load("data/mllib/images/origin")
+res1: org.apache.spark.sql.DataFrame = [image: struct]
+{% endhighlight %}
+
+
+

+[`ImageDataSource`](api/java/org/apache/spark/ml/source/image/ImageDataSource.html)
--- End diff --

cc @mengxr as well


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22675: [SPARK-25347][ML][DOC] Spark datasource for image...

2018-10-08 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22675#discussion_r223552386
  
--- Diff: docs/ml-datasource.md ---
@@ -0,0 +1,51 @@
+---
+layout: global
+title: Data sources
+displayTitle: Data sources
+---
+
+In this section, we introduce how to use data source in ML to load data.
+Beside some general data sources "parquat", "csv", "json", "jdbc", we also 
provide some specific data source for ML.
+
+**Table of Contents**
+
+* This will become a table of contents (this text will be scraped).
+{:toc}
+
+## Image data source
+
+This image data source is used to load libsvm data files from directory.
+
+
+

+[`ImageDataSource`](api/scala/index.html#org.apache.spark.ml.source.image.ImageDataSource)
+implements Spark SQL data source API for loading image data as DataFrame.
+The loaded DataFrame has one StructType column: "image". containing image 
data stored as image schema.
+
+{% highlight scala %}
+scala> spark.read.format("image").load("data/mllib/images/origin")
+res1: org.apache.spark.sql.DataFrame = [image: struct]
+{% endhighlight %}
+
+
+

+[`ImageDataSource`](api/java/org/apache/spark/ml/source/image/ImageDataSource.html)
--- End diff --

Out of curiosity, why did we put the image source inside of Spark, rather 
then a separate module? (see also 
https://github.com/apache/spark/pull/21742#discussion_r201552008). Avro was put 
as a separate module.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22653: [SPARK-25659][PYTHON][TEST] Test type inference s...

2018-10-08 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22653#discussion_r223552056
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -1149,6 +1149,75 @@ def test_infer_schema(self):
 result = self.spark.sql("SELECT l[0].a from test2 where d['key'].d 
= '2'")
 self.assertEqual(1, result.head()[0])
 
+def test_infer_schema_specification(self):
+from decimal import Decimal
+
+class A(object):
+def __init__(self):
+self.a = 1
+
+data = [
+True,
+1,
+"a",
+u"a",
--- End diff --

Fair point. Will change when I fix some codes around here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22615: [SPARK-25016][BUILD][CORE] Remove support for Hadoop 2.6

2018-10-08 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22615
  
I want to see the configurations .. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-08 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r223550790
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1890,6 +1890,10 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
 
 # Migration Guide
 
+## Upgrading From Spark SQL 2.4 to 3.0
+
+  - Since Spark 3.0, the `from_json` functions supports two modes - 
`PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The 
default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` 
did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing 
of malformed JSON records. For example, the JSON string `{"a" 1}` with the 
schema `a INT` is converted to `null` by previous versions but Spark 3.0 
converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects 
are considered as invalid and converted to `null` if specified schema is 
`StructType`. Since Spark 3.0, the input is considered as a valid JSON array 
and only its first element is parsed if it conforms to the specified 
`StructType`.
--- End diff --

Last option sounds better to me but can we fill the corrupt row when the 
corrupt field name is specified in the schema? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22653: [SPARK-25659][PYTHON][TEST] Test type inference specific...

2018-10-08 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22653
  
Merged to master.

Thank you @BryanCutler. Let me try to make sure I take further actions for 
related items.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22635: [SPARK-25591][PySpark][SQL] Avoid overwriting deserializ...

2018-10-08 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22635
  
Yea, same issue exists in Pandas UDFs too (quickly double checked). This PR 
fixes it. That code path is rather one same place FYI.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22653: [SPARK-25659][PYTHON][TEST] Test type inference specific...

2018-10-08 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22653
  
Let me open a JIRA after taking some more further looks. I checked some 
code places and looks it's not easy to support `None` for instance.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22653: [SPARK-25659][PYTHON][TEST] Test type inference s...

2018-10-08 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22653#discussion_r223520528
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -1149,6 +1149,75 @@ def test_infer_schema(self):
 result = self.spark.sql("SELECT l[0].a from test2 where d['key'].d 
= '2'")
 self.assertEqual(1, result.head()[0])
 
+def test_infer_schema_specification(self):
+from decimal import Decimal
+
+class A(object):
+def __init__(self):
+self.a = 1
+
+data = [
+True,
+1,
+"a",
+u"a",
+datetime.date(1970, 1, 1),
+datetime.datetime(1970, 1, 1, 0, 0),
+1.0,
+array.array("d", [1]),
+[1],
+(1, ),
+{"a": 1},
+bytearray(1),
+Decimal(1),
+Row(a=1),
+Row("a")(1),
+A(),
--- End diff --

Yea, it uses `__dict__` attribute..  looks that's not possible in UDFs .. 
possibly an issue.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22545: [SPARK-25525][SQL][PYSPARK] Do not update conf fo...

2018-10-08 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22545#discussion_r223408456
  
--- Diff: python/pyspark/sql/session.py ---
@@ -156,7 +156,7 @@ def getOrCreate(self):
 default.
 
 >>> s1 = SparkSession.builder.config("k1", "v1").getOrCreate()
->>> s1.conf.get("k1") == s1.sparkContext.getConf().get("k1") 
== "v1"
+>>> s1.conf.get("k1") == "v1"
--- End diff --

In that case, we might have to put the behaviour changes by 
https://github.com/apache/spark/pull/18536 together to the migration guide as 
well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22657: [SPARK-25670][TEST] Reduce number of tested timezones in...

2018-10-08 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22657
  
I agree. We don't test `commons-lang3` library here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22631: [SPARK-25605][TESTS] Run cast string to timestamp...

2018-10-08 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22631#discussion_r223286771
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
 ---
@@ -110,7 +112,7 @@ class CastSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   }
 
   test("cast string to timestamp") {
-for (tz <- ALL_TIMEZONES) {
+for (tz <- Random.shuffle(ALL_TIMEZONES).take(50)) {
--- End diff --

I mean some tests like with randomized input, let's say, integer range 
input are fine in common sense but this case is different, isn't it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22631: [SPARK-25605][TESTS] Run cast string to timestamp...

2018-10-08 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22631#discussion_r223285813
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
 ---
@@ -110,7 +112,7 @@ class CastSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   }
 
   test("cast string to timestamp") {
-for (tz <- ALL_TIMEZONES) {
+for (tz <- Random.shuffle(ALL_TIMEZONES).take(50)) {
--- End diff --

I think tests need to be deterministic in general as well.

In this particular case ideally, we should categorize timezones and test 
fixed set. For instance, timezone with DST, without DST, and some exceptions 
such as, for instance, see this particular case which Python 3.6 addressed 
lately 
(https://github.com/python/cpython/blob/e42b705188271da108de42b55d9344642170aa2b/Lib/datetime.py#L1572-L1574),
 IMHO.

Of course, this approach requires a lot of investigations and overheads. 
So, as an alternative, I would incline to go for Sean's approach 
(https://github.com/apache/spark/pull/22631/files#r223224573) for this 
particular case.

For randomness, I think primarily we should have first deterministic set of 
tests. Maybe we could additionally have some set of randomized input to cover 
some cases we haven't foreseen but that's secondary.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22655: [SPARK-25666][PYTHON] Internally document type conversio...

2018-10-08 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22655
  
Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22655: [SPARK-25666][PYTHON] Internally document type conversio...

2018-10-08 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22655
  
I am getting this in. This is an ongoing effort and it just documents them 
internally for now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22669: [SPARK-25677] [Doc] spark.io.compression.codec = org.apa...

2018-10-08 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22669
  
Merged to master and branch-2.4.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22635: [SPARK-25591][PySpark][SQL] Avoid overwriting deserializ...

2018-10-08 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22635
  
Merged to master and branch-2.4.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22667: [SPARK-25673][BUILD] Remove Travis CI which enables Java...

2018-10-08 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22667
  
Merged to master and branch-2.4.

Thanks @srowen and @felixcheung.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22669: [SPARK-25677] [Doc] spark.io.compression.codec = org.apa...

2018-10-08 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22669
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22655: [WIP][SPARK-25666][PYTHON] Internally document type conv...

2018-10-07 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22655
  
Let me make this table for Pandas UDF too and then open another JIRA (or 
mailing list) to discuss about this further. I need more investigations to 
propose the desired behaviour targeting 3.0.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22665: add [openjdk11] to Travis build matrix

2018-10-07 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22665
  
@sullis, this file will be obsolete per 
https://github.com/apache/spark/pull/22667. This shall be closed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22667: [SPARK-25673][BUILD] Remove Travis CI which enabl...

2018-10-07 Thread HyukjinKwon
GitHub user HyukjinKwon opened a pull request:

https://github.com/apache/spark/pull/22667

[SPARK-25673][BUILD] Remove Travis CI which enables Java lint check

## What changes were proposed in this pull request?

https://github.com/apache/spark/pull/12980 added Travis CI file mainly for 
linter because we disabled Java lint check in Jenkins.

It's enabled as of https://github.com/apache/spark/pull/21399 and now SBT 
runs it. Looks we can now remove the file added before.

## How was this patch tested?

N/A

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HyukjinKwon/spark SPARK-25673

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22667.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22667


commit 7a535db83ba4b47c489166fb2b149fbb32b0aba4
Author: hyukjinkwon 
Date:   2018-10-08T02:00:51Z

Remove Travis CI which enables Java lint check




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22667: [SPARK-25673][BUILD] Remove Travis CI which enables Java...

2018-10-07 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22667
  
cc @srowen and @dongjoon-hyun 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22659: [SPARK-25623][TEST] Reduce test time of LogisticRegressi...

2018-10-07 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22659
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22653: [SPARK-25659][PYTHON][TEST] Test type inference specific...

2018-10-07 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22653
  
@BryanCutler and @viirya would you mind if I ask to take a look please?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22655: [SPARK-25666][PYTHON] Internally document type co...

2018-10-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22655#discussion_r223218608
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2733,6 +2733,33 @@ def udf(f=None, returnType=StringType()):
 | 8|  JOHN DOE|  22|
 +--+--++
 """
+
+# The following table shows most of Python data and SQL type 
conversions in normal UDFs that
+# are not yet visible to the user. Some of behaviors are buggy and 
might be changed in the near
+# future. The table might have to be eventually documented externally.
+# Please see SPARK-25666's PR to see the codes in order to generate 
the table below.
+#
+# 
+-+--+--+--+---+--+--++-+--+--+-++--+--+--+-+-+
  # noqa
+# |SQL Type \ Python 
Value(Type)|None(NoneType)|True(bool)|1(int)|1(long)|a(str)|a(unicode)|
1970-01-01(date)|1970-01-01 00:00:00(datetime)|1.0(float)|array('i', 
[1])(array)|[1](list)| (1,)(tuple)|ABC(bytearray)|1(Decimal)|{'a': 
1}(dict)|Row(a=1)(Row)|Row(a=1)(Row)|  # noqa
--- End diff --

Right, one was `Row(a=1)` and the other one was namedtuple approach 
`Row("a")(1)`. Let me try to update.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22655: [SPARK-25666][PYTHON] Internally document type co...

2018-10-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22655#discussion_r223218561
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2733,6 +2733,33 @@ def udf(f=None, returnType=StringType()):
 | 8|  JOHN DOE|  22|
 +--+--++
 """
+
+# The following table shows most of Python data and SQL type 
conversions in normal UDFs that
+# are not yet visible to the user. Some of behaviors are buggy and 
might be changed in the near
+# future. The table might have to be eventually documented externally.
+# Please see SPARK-25666's PR to see the codes in order to generate 
the table below.
+#
+# 
+-+--+--+--+---+--+--++-+--+--+-++--+--+--+-+-+
  # noqa
+# |SQL Type \ Python 
Value(Type)|None(NoneType)|True(bool)|1(int)|1(long)|a(str)|a(unicode)|
1970-01-01(date)|1970-01-01 00:00:00(datetime)|1.0(float)|array('i', 
[1])(array)|[1](list)| (1,)(tuple)|ABC(bytearray)|1(Decimal)|{'a': 
1}(dict)|Row(a=1)(Row)|Row(a=1)(Row)|  # noqa
+# 
+-+--+--+--+---+--+--++-+--+--+-++--+--+--+-+-+
  # noqa
+# | null|  None|  None|  None|   
None|  None|  None|None| None|  
None|  None| None|None|  None|  
None|  None|X|X|  # noqa
+# |  boolean|  None|  True|  None|   
None|  None|  None|None| None|  
None|  None| None|None|  None|  
None|  None|X|X|  # noqa
+# |  tinyint|  None|  None| 1| 
 1|  None|  None|None| None|  
None|  None| None|None|  None|  
None|  None|X|X|  # noqa
+# | smallint|  None|  None| 1| 
 1|  None|  None|None| None|  
None|  None| None|None|  None|  
None|  None|X|X|  # noqa
+# |  int|  None|  None| 1| 
 1|  None|  None|None| None|  
None|  None| None|None|  None|  
None|  None|X|X|  # noqa
+# |   bigint|  None|  None| 1| 
 1|  None|  None|None| None|  
None|  None| None|None|  None|  
None|  None|X|X|  # noqa
+# |   string|  None|  true| 1| 
 1| a| a|java.util.Gregori...| java.util.Gregori...|   
1.0|   [I@7f1970e1|  [1]|[Ljava.lang.Objec...|   [B@284838a9|   
  1| {a=1}|X|X|  # noqa
--- End diff --

Hm .. I see the type is not clear here. Let me think about this a bit 
more.

`[B@284838a9` is a quite buggy behaviour - we should fix. So I was thinking 
of documenting internally since we already spent much time to figure out how it 
works for each case individually (at 
https://github.com/apache/spark/pull/20163).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22635: [SPARK-25591][PySpark][SQL] Avoid overwriting deserializ...

2018-10-07 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22635
  
Nice catch @viirya LGTM.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22635: [SPARK-25591][PySpark][SQL] Avoid overwriting des...

2018-10-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22635#discussion_r223218449
  
--- Diff: python/pyspark/accumulators.py ---
@@ -109,10 +109,14 @@
 
 def _deserialize_accumulator(aid, zero_value, accum_param):
 from pyspark.accumulators import _accumulatorRegistry
-accum = Accumulator(aid, zero_value, accum_param)
-accum._deserialized = True
-_accumulatorRegistry[aid] = accum
-return accum
+# If this certain accumulator was deserialized, don't overwrite it.
+if aid in _accumulatorRegistry:
--- End diff --

Ah, so the problem is this accumulator is de/serialized multiple times and 
`_deserialize_accumulator` modifies the global status multiple times. I see. 
LGTM.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22635: [SPARK-25591][PySpark][SQL] Avoid overwriting des...

2018-10-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22635#discussion_r223218361
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3603,6 +3603,31 @@ def test_repr_behaviors(self):
 self.assertEquals(None, df._repr_html_())
 self.assertEquals(expected, df.__repr__())
 
+# SPARK-25591
+def test_same_accumulator_in_udfs(self):
+from pyspark.sql.functions import udf
+
+data_schema = StructType([StructField("a", DoubleType(), True),
+  StructField("b", DoubleType(), True)])
+data = self.spark.createDataFrame([[1.0, 2.0]], schema=data_schema)
+
+test_accum = self.sc.accumulator(0.0)
+
+def first_udf(x):
+test_accum.add(1.0)
+return x
+
+def second_udf(x):
+test_accum.add(100.0)
+return x
+
+func_udf = udf(first_udf, DoubleType())
+func_udf2 = udf(second_udf, DoubleType())
+data = data.withColumn("out1", func_udf(data["a"]))
+data = data.withColumn("out2", func_udf2(data["b"]))
+data.collect()
+self.assertEqual(test_accum.value, 101)
--- End diff --

@viirya, can we just use int for data and accumulator as well in this test 
case?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22610: [SPARK-25461][PySpark][SQL] Add document for mismatch be...

2018-10-07 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22610
  
Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22610: [SPARK-25461][PySpark][SQL] Add document for mism...

2018-10-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22610#discussion_r223217249
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2909,6 +2909,12 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
 can fail on special rows, the workaround is to incorporate the 
condition into the functions.
 
 .. note:: The user-defined functions do not take keyword arguments on 
the calling side.
+
+.. note:: The data type of returned `pandas.Series` from the 
user-defined functions should be
+matched with defined returnType (see :meth:`types.to_arrow_type` 
and
+:meth:`types.from_arrow_type`). When there is mismatch between 
them, Spark might do
+conversion on returned data. The conversion is not guaranteed to 
be correct and results
+should be checked for accuracy by users.
--- End diff --

I am merging this since this describes the current status but let's make it 
clear and try to get rid of this note within 3.0.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22647: [SPARK-25655] [BUILD] Add -Pspark-ganglia-lgpl to...

2018-10-06 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22647#discussion_r223186506
  
--- Diff: 
external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
 ---
@@ -64,11 +64,12 @@ class GangliaSink(val property: Properties, val 
registry: MetricRegistry,
   val ttl = 
propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL)
   val dmax = 
propertyToOption(GANGLIA_KEY_DMAX).map(_.toInt).getOrElse(GANGLIA_DEFAULT_DMAX)
   val mode: UDPAddressingMode = propertyToOption(GANGLIA_KEY_MODE)
-.map(u => 
GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE)
+.map(u => 
GMetric.UDPAddressingMode.valueOf(u.toUpperCase(Locale.Root)))
--- End diff --

That's odd. Let me check soon within few days.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22655: [SPARK-25666][PYTHON] Internally document type conversio...

2018-10-06 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22655
  
cc @cloud-fan, @viirya and @BryanCutler, WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22655: [SPARK-25666][PYTHON] Internally document type co...

2018-10-06 Thread HyukjinKwon
GitHub user HyukjinKwon opened a pull request:

https://github.com/apache/spark/pull/22655

[SPARK-25666][PYTHON] Internally document type conversion between Python 
data and SQL types in normal UDFs

### What changes were proposed in this pull request?

We are facing some problems about type conversions between Python data and 
SQL types in UDFs (Pandas UDFs as well).
It's even difficult to identify the problems (see 
https://github.com/apache/spark/pull/20163 and 
https://github.com/apache/spark/pull/22610).

This PR targets to internally document the type conversion table. Some of 
them looks buggy and we should fix them.

```python
import array
import datetime
from decimal import Decimal

from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import udf


data = [
None,
True,
1,
1L, # Python 2 only
"a",
u"a",
datetime.date(1970, 1, 1),
datetime.datetime(1970, 1, 1, 0, 0),
1.0,
array.array("i", [1]),
[1],
(1,),
bytearray([65, 66, 67]),
Decimal(1),
{"a": 1},
Row(a=1),
Row("a")(1),
]

types =  [
NullType(),
BooleanType(),
ByteType(),
ShortType(),
IntegerType(),
LongType(),
StringType(),
DateType(),
TimestampType(),
FloatType(),
DoubleType(),
ArrayType(IntegerType()),
BinaryType(),
DecimalType(10, 0),
MapType(StringType(), IntegerType()),
StructType([StructField("_1", IntegerType())]),
]


df = spark.range(1)
results = []
for t in types:
result = []
for v in data:
try:
row = df.select(udf(lambda: v, t)()).first()
result.append(row[0])
except Exception:
result.append("X")
results.append([t.simpleString()] + map(str, result))

schema = ["SQL Type \\ Python Value(Type)"] + map(lambda v: "%s(%s)" % 
(str(v), type(v).__name__), data)
strings = spark.createDataFrame(results, schema=schema)._jdf.showString(20, 
20, False)
print("\n".join(map(lambda line: "# %s  # noqa" % line, 
strings.strip().split("\n"
```

## How was this patch tested?

Manually tested and lint check.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HyukjinKwon/spark SPARK-25666

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22655.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22655


commit 3084be1de3ff58a9258dacfb8d7cf575df3fb3c9
Author: hyukjinkwon 
Date:   2018-10-06T10:59:46Z

Internally document type conversion between Python data and SQL types in 
UDFs




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22653: [SPARK-25659][PYTHON] Test type inference specifi...

2018-10-06 Thread HyukjinKwon
GitHub user HyukjinKwon opened a pull request:

https://github.com/apache/spark/pull/22653

[SPARK-25659][PYTHON] Test type inference specification for createDataFrame 
in PySpark

## What changes were proposed in this pull request?

This PR proposes to specify type inference and simple e2e tests. Looks we 
are not cleanly testing those logics. 

For instance, see 
https://github.com/apache/spark/blob/08c76b5d39127ae207d9d1fff99c2551e6ce2581/python/pyspark/sql/types.py#L894-L905

Looks we intended to support datetime.time and None for type inference too 
but it does not work:

```
>>> spark.createDataFrame([[datetime.time()]])
Traceback (most recent call last):
  File "", line 1, in 
  File "/.../spark/python/pyspark/sql/session.py", line 751, in 
createDataFrame
rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/.../spark/python/pyspark/sql/session.py", line 432, in 
_createFromLocal
data = [schema.toInternal(row) for row in data]
  File "/.../spark/python/pyspark/sql/types.py", line 604, in toInternal
for f, v, c in zip(self.fields, obj, self._needConversion))
  File "/.../spark/python/pyspark/sql/types.py", line 604, in 
for f, v, c in zip(self.fields, obj, self._needConversion))
  File "/.../spark/python/pyspark/sql/types.py", line 442, in toInternal
return self.dataType.toInternal(obj)
  File "/.../spark/python/pyspark/sql/types.py", line 193, in toInternal
else time.mktime(dt.timetuple()))
AttributeError: 'datetime.time' object has no attribute 'timetuple'
>>> spark.createDataFrame([[None]])
Traceback (most recent call last):
  File "", line 1, in 
  File "/.../spark/python/pyspark/sql/session.py", line 751, in 
createDataFrame
rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/.../spark/python/pyspark/sql/session.py", line 419, in 
_createFromLocal
struct = self._inferSchemaFromList(data, names=schema)
  File "/.../python/pyspark/sql/session.py", line 353, in 
_inferSchemaFromList
raise ValueError("Some of types cannot be determined after inferring")
ValueError: Some of types cannot be determined after inferring
```
## How was this patch tested?

Manual tests and unit tests were added.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HyukjinKwon/spark SPARK-25659

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22653.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22653


commit 8fddb9dca26ce36be1f7eaf0d356bf78070486f9
Author: hyukjinkwon 
Date:   2018-10-06T09:26:04Z

Test type inference specification for createDataFrame in PySpark




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22640: [SPARK-25621][SPARK-25622][TEST] Reduce test time of Buc...

2018-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22640
  
Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...

2018-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22295#discussion_r223173806
  
--- Diff: python/pyspark/sql/session.py ---
@@ -252,6 +255,20 @@ def newSession(self):
 """
 return self.__class__(self._sc, self._jsparkSession.newSession())
 
+@classmethod
+@since(2.5)
+def getActiveSession(cls):
+"""
+Returns the active SparkSession for the current thread, returned 
by the builder.
+>>> s = SparkSession.getActiveSession()
+>>> l = [('Alice', 1)]
+>>> rdd = s.sparkContext.parallelize(l)
+>>> df = s.createDataFrame(rdd, ['name', 'age'])
+>>> df.select("age").collect()
+[Row(age=1)]
+"""
+return cls._activeSession
--- End diff --

Yea, it should look like that


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22619: [SPARK-25600][SQL][MINOR] Make use of TypeCoercion.findT...

2018-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22619
  
Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22448: [SPARK-25417][SQL] Improve findTightestCommonType to coe...

2018-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22448
  
@dilipbiswal, can you file another JIRA instead of SPARK-25417 specifically 
for type coercion?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22646: [SPARK-25654][SQL] Support for nested JavaBean arrays, l...

2018-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22646
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22649: [SPARK-25644][SS][FOLLOWUP][BUILD] Fix Scala 2.12 build ...

2018-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22649
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...

2018-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22610#discussion_r223173637
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2909,6 +2909,11 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
 can fail on special rows, the workaround is to incorporate the 
condition into the functions.
 
 .. note:: The user-defined functions do not take keyword arguments on 
the calling side.
+
+.. note:: The data type of returned `pandas.Series` from the 
user-defined functions should be
+matched with defined returnType. When there is mismatch between 
them, it is not guaranteed
+that the conversion by SparkSQL during serialization is correct at 
all and users might get
--- End diff --

>  an attempt will be made to cast the data and results should be checked 
for accuracy."

it sounds like the casting is intentional. I think the casting logic is not 
that clear as far as I can tell, comparing SQL casting logic. Can we leave this 
not guaranteed for now and document the casting logic here instead? Does Arrow 
have some kind of documentation for type conversion BTW?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22227: [SPARK-25202] [SQL] Implements split with limit sql func...

2018-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/7
  
Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22647: [SPARK-25655] [BUILD] Add -Pspark-ganglia-lgpl to the sc...

2018-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22647
  
Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22636: [SPARK-25629][TEST] Reduce ParquetFilterSuite: filter pu...

2018-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22636
  
Yea, it is not obvious and only few seconds - might not be so worth. But 
looks improvement because it fixes the test cases to test what the previous PR 
targeted. Wouldn't it be better just to go ahead rather then close this since 
the PR is already open?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22647: [SPARK-25655] [BUILD] Add -Pspark-ganglia-lgpl to the sc...

2018-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22647
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22640: [SPARK-25621][SPARK-25622][TEST] Reduce test time of Buc...

2018-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22640
  
cc @cloud-fan too for sure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22640: [SPARK-25621][SPARK-25622][TEST] Reduce test time...

2018-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22640#discussion_r222975187
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala ---
@@ -66,7 +66,7 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils {
 .bucketBy(8, "j", "k")
 .saveAsTable("bucketed_table")
 
-  for (i <- 0 until 5) {
--- End diff --

Oh haha I was struggling why it was 5 and 13 and looks it's in `df` :-)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r222971397
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 ---
@@ -15,50 +15,51 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.datasources
+package org.apache.spark.sql.catalyst.util
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
 class FailureSafeParser[IN](
 rawParser: IN => Seq[InternalRow],
 mode: ParseMode,
-schema: StructType,
+dataType: DataType,
 columnNameOfCorruptRecord: String,
 isMultiLine: Boolean) {
-
-  private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
-  private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
-  private val resultRow = new GenericInternalRow(schema.length)
-  private val nullResult = new GenericInternalRow(schema.length)
-
   // This function takes 2 parameters: an optional partial result, and the 
bad record. If the given
   // schema doesn't contain a field for corrupted record, we just return 
the partial result or a
   // row with all fields null. If the given schema contains a field for 
corrupted record, we will
   // set the bad record to this field, and set other fields according to 
the partial result or null.
-  private val toResultRow: (Option[InternalRow], () => UTF8String) => 
InternalRow = {
-if (corruptFieldIndex.isDefined) {
-  (row, badRecord) => {
-var i = 0
-while (i < actualSchema.length) {
-  val from = actualSchema(i)
-  resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, 
from.dataType)).orNull
-  i += 1
+  private val toResultRow: (Option[InternalRow], () => UTF8String) => 
InternalRow = dataType match {
+case struct: StructType =>
+  val corruptFieldIndex = 
struct.getFieldIndex(columnNameOfCorruptRecord)
--- End diff --

Do you mind if I ask to make a private function for this one (L38 to L54) 
and define this around here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...

2018-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22295#discussion_r222959106
  
--- Diff: python/pyspark/sql/session.py ---
@@ -252,6 +255,20 @@ def newSession(self):
 """
 return self.__class__(self._sc, self._jsparkSession.newSession())
 
+@classmethod
+@since(2.5)
--- End diff --

Let's do this to 3.0. Per 
https://github.com/apache/spark/commit/9bf397c0e45cb161f3f12f09bd2bf14ff96dc823,
 looks we are going ahead for 3.0 now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...

2018-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22295#discussion_r222958780
  
--- Diff: python/pyspark/sql/session.py ---
@@ -252,6 +255,20 @@ def newSession(self):
 """
 return self.__class__(self._sc, self._jsparkSession.newSession())
 
+@classmethod
+@since(2.5)
+def getActiveSession(cls):
+"""
+Returns the active SparkSession for the current thread, returned 
by the builder.
+>>> s = SparkSession.getActiveSession()
+>>> l = [('Alice', 1)]
+>>> rdd = s.sparkContext.parallelize(l)
+>>> df = s.createDataFrame(rdd, ['name', 'age'])
+>>> df.select("age").collect()
+[Row(age=1)]
+"""
+return cls._activeSession
--- End diff --

Yup.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21596: [SPARK-24601] Update Jackson to 2.9.6

2018-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/21596
  
Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStreamWrit...

2018-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22633
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22227: [SPARK-25202] [SQL] Implements split with limit sql func...

2018-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/7
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r222926650
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala 
---
@@ -51,6 +56,8 @@ object ParseMode extends Logging {
 case PermissiveMode.name => PermissiveMode
 case DropMalformedMode.name => DropMalformedMode
 case FailFastMode.name => FailFastMode
+case NullMalformedMode.name => NullMalformedMode
--- End diff --

I suggested to keep the previous behaviour only because I thought we're in 
transition to 2.5. Since we are going ahead for 3.0, I think I am okay without 
keeping it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22635: [SPARK-25591][PySpark][SQL] Avoid overwriting deserializ...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22635
  
Thanks for cc'ing me. Will take a look this week.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStreamWrit...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22633
  
Looks fine to me


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222894056
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
--- End diff --

`foreachBatchFunction` -> `foreach_batch_function`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222894110
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
--- End diff --

`epochId` -> `epoch_id`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222893790
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend 

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222894028
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
--- End diff --

4 space indentation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222893841
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend 

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222893720
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend 

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222893572
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend 

[GitHub] spark pull request #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStr...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22633#discussion_r222893318
  
--- Diff: 
sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java
 ---
@@ -0,0 +1,89 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package test.org.apache.spark.sql.streaming;
+
+import java.io.File;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.function.VoidFunction2;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.ForeachWriter;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.test.TestSparkSession;
+import org.apache.spark.util.Utils;
+
+public class JavaDataStreamReaderWriterSuite {
+  private SparkSession spark;
+  private String input;
+
+  @Before
+  public void setUp() {
+spark = new TestSparkSession();
+input = Utils.createTempDir(System.getProperty("java.io.tmpdir"), 
"input").toString();
+  }
+
+  @After
+  public void tearDown() {
+Utils.deleteRecursively(new File(input));
+spark.stop();
+spark = null;
+  }
+
+  @Test
+  public void testForeachBatchAPI() {
+StreamingQuery query = spark
+.readStream()
+.textFile(input)
+.writeStream()
+.foreachBatch(new VoidFunction2, Long>() {
+  @Override
+  public void call(Dataset v1, Long v2) throws Exception {
+  }
+})
+.start();
+query.stop();
+  }
+
+  @Test
+  public void testForeachAPI() {
+StreamingQuery query = spark
+.readStream()
+.textFile(input)
+.writeStream()
+.foreach(new ForeachWriter() {
+  @Override
+  public boolean open(long partitionId, long epochId) {
+return true;
+  }
+
+  @Override
+  public void process(String value) {
+  }
--- End diff --

tiny nit: I would just `public void process(String value) { }`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStr...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22633#discussion_r222893114
  
--- Diff: 
sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java
 ---
@@ -0,0 +1,89 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package test.org.apache.spark.sql.streaming;
+
+import java.io.File;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.function.VoidFunction2;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.ForeachWriter;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.test.TestSparkSession;
+import org.apache.spark.util.Utils;
+
+public class JavaDataStreamReaderWriterSuite {
+  private SparkSession spark;
+  private String input;
+
+  @Before
+  public void setUp() {
+spark = new TestSparkSession();
+input = Utils.createTempDir(System.getProperty("java.io.tmpdir"), 
"input").toString();
+  }
+
+  @After
+  public void tearDown() {
+Utils.deleteRecursively(new File(input));
+spark.stop();
+spark = null;
+  }
+
+  @Test
+  public void testForeachBatchAPI() {
+StreamingQuery query = spark
+.readStream()
+.textFile(input)
+.writeStream()
+.foreachBatch(new VoidFunction2, Long>() {
+  @Override
+  public void call(Dataset v1, Long v2) throws Exception {
+  }
+})
+.start();
+query.stop();
+  }
+
+  @Test
+  public void testForeachAPI() {
+StreamingQuery query = spark
+.readStream()
--- End diff --

Nit: 2 space indentation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22619: [SPARK-25600][SQL][MINOR] Make use of TypeCoercion.findT...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22619
  
Yup. Let me leave this open few more days in case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22637
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22637
  
mind filling PR description please?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22399: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22399
  
Let me trigger it in the next PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22399: [SPARK-25408] Move to mode ideomatic Java8

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22399
  
@Fokko, Let's follow the PR format next time BTW, for instance, "How was 
this patch tested?"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22610#discussion_r222885910
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2909,6 +2909,11 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
 can fail on special rows, the workaround is to incorporate the 
condition into the functions.
 
 .. note:: The user-defined functions do not take keyword arguments on 
the calling side.
+
+.. note:: The data type of returned `pandas.Series` from the 
user-defined functions should be
+matched with defined returnType. When there is mismatch between 
them, it is not guaranteed
+that the conversion by SparkSQL during serialization is correct at 
all and users might get
--- End diff --

maybe I am concerning too much .. but how about just say .. 

```
... defined returnType (see :meth:`types.to_arrow_type` and 
:meth:`types.from_arrow_type`). 
When there is mismatch between them, the conversion is not guaranteed.
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22610#discussion_r222885267
  
--- Diff: python/pyspark/worker.py ---
@@ -84,13 +84,36 @@ def wrap_scalar_pandas_udf(f, return_type):
 arrow_return_type = to_arrow_type(return_type)
 
 def verify_result_length(*a):
+import pyarrow as pa
 result = f(*a)
 if not hasattr(result, "__len__"):
 raise TypeError("Return type of the user-defined function 
should be "
 "Pandas.Series, but is 
{}".format(type(result)))
 if len(result) != len(a[0]):
 raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
"expected %d, got %d" % (len(a[0]), 
len(result)))
+
+# Ensure return type of Pandas.Series matches the arrow return 
type of the user-defined
+# function. Otherwise, we may produce incorrect serialized data.
+# Note: for timestamp type, we only need to ensure both types are 
timestamp because the
+# serializer will do conversion.
+try:
+arrow_type_of_result = pa.from_numpy_dtype(result.dtype)
+both_are_timestamp = 
pa.types.is_timestamp(arrow_type_of_result) and \
+pa.types.is_timestamp(arrow_return_type)
+if not both_are_timestamp and arrow_return_type != 
arrow_type_of_result:
+print("WARN: Arrow type %s of return Pandas.Series of the 
user-defined function's "
--- End diff --

Yes .. I support to just fix the doc first here only and make a PR 
separately later if needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22227: [SPARK-25202] [SQL] Implements split with limit sql func...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/7
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22622: [SPARK-25635][SQL][BUILD] Support selective direct encod...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22622
  
Which looks now fixed in 
https://github.com/apache/spark/commit/5ae20cf1a96a33f5de4435fcfb55914d64466525


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22622: [SPARK-25635][SQL][BUILD] Support selective direct encod...

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22622
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22626: [SPARK-25638][SQL] Adding new function - to_csv()

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22626
  
add to whitelist


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22626: [SPARK-25638][SQL] Adding new function - to_csv()

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22626
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21596: [SPARK-24601] Update Jackson to 2.9.6

2018-10-04 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/21596
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22622: [SPARK-25635][SQL][BUILD] Support selective direc...

2018-10-03 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22622#discussion_r222535553
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
 ---
@@ -115,6 +116,71 @@ abstract class OrcSuite extends OrcTest with 
BeforeAndAfterAll {
 }
   }
 
+  protected def testSelectiveDictionaryEncoding(isSelective: Boolean) {
+val tableName = "orcTable"
+
+withTempDir { dir =>
+  withTable(tableName) {
+val sqlStatement = orcImp match {
+  case "native" =>
+s"""
+   |CREATE TABLE $tableName (zipcode STRING, uuid STRING, 
value DOUBLE)
+   |USING ORC
+   |OPTIONS (
+   |  path '${dir.toURI}',
+   |  orc.dictionary.key.threshold '1.0',
+   |  orc.column.encoding.direct 'uuid'
--- End diff --

How about changing column name? I thought it's some kind of enum to 
represent encoding stuff.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22622: [SPARK-25635][SQL][BUILD] Support selective direc...

2018-10-03 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22622#discussion_r222535182
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
 ---
@@ -115,6 +116,71 @@ abstract class OrcSuite extends OrcTest with 
BeforeAndAfterAll {
 }
   }
 
+  protected def testSelectiveDictionaryEncoding(isSelective: Boolean) {
+val tableName = "orcTable"
+
+withTempDir { dir =>
+  withTable(tableName) {
+val sqlStatement = orcImp match {
+  case "native" =>
+s"""
+   |CREATE TABLE $tableName (zipcode STRING, uuid STRING, 
value DOUBLE)
+   |USING ORC
+   |OPTIONS (
+   |  path '${dir.toURI}',
+   |  orc.dictionary.key.threshold '1.0',
+   |  orc.column.encoding.direct 'uuid'
+   |)
+""".stripMargin
+  case "hive" =>
+s"""
+   |CREATE TABLE $tableName (zipcode STRING, uuid STRING, 
value DOUBLE)
+   |STORED AS ORC
+   |LOCATION '${dir.toURI}'
+   |TBLPROPERTIES (
+   |  orc.dictionary.key.threshold '1.0',
+   |  hive.exec.orc.dictionary.key.size.threshold '1.0',
+   |  orc.column.encoding.direct 'uuid'
+   |)
+""".stripMargin
+  case impl =>
+throw new UnsupportedOperationException(s"Unknown ORC 
implementation: $impl")
+}
+
+sql(sqlStatement)
+sql(s"INSERT INTO $tableName VALUES ('94086', 
'random-uuid-string', 0.0)")
+
+val partFiles = dir.listFiles()
+  .filter(f => f.isFile && !f.getName.startsWith(".") && 
!f.getName.startsWith("_"))
+assert(partFiles.length === 1)
+
+val orcFilePath = new Path(partFiles.head.getAbsolutePath)
+val readerOptions = OrcFile.readerOptions(new Configuration())
+val reader = OrcFile.createReader(orcFilePath, readerOptions)
+var recordReader: RecordReaderImpl = null
+try {
+  recordReader = reader.rows.asInstanceOf[RecordReaderImpl]
+
+  // Check the kind
+  val stripe = 
recordReader.readStripeFooter(reader.getStripes.get(0))
+  if (isSelective) {
+assert(stripe.getColumns(1).getKind === DICTIONARY_V2)
--- End diff --

@dongjoon-hyun, how about:

```
assert(stripe.getColumns(1).getKind === DICTIONARY_V2)
assert(stripe.getColumns(3).getKind === DIRECT)
if (isSelective) {
  assert(stripe.getColumns(2).getKind === DIRECT_V2)
} else {
  assert(stripe.getColumns(2).getKind === DICTIONARY_V2)
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22622: [SPARK-25635][SQL][BUILD] Support selective direc...

2018-10-03 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22622#discussion_r222535254
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
 ---
@@ -284,4 +350,8 @@ class OrcSourceSuite extends OrcSuite with 
SharedSQLContext {
   test("Check BloomFilter creation") {
 testBloomFilterCreation(Kind.BLOOM_FILTER_UTF8) // After ORC-101
   }
+
+  test("Enforce direct encoding column-wise selectively") {
+testSelectiveDictionaryEncoding(true)
--- End diff --

how about `testSelectiveDictionaryEncoding(isSelective = true)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22593: [Streaming][DOC] Fix typo & format in DataStreamWriter.s...

2018-10-03 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22593
  
ping @niofire 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning when retu...

2018-10-03 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22610
  
One clear thing looks adding some documentation .. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    7   8   9   10   11   12   13   14   15   16   >