[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223744025 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -505,20 +505,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val actualSchema = StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) -val linesWithoutHeader = if (parsedOptions.headerFlag && maybeFirstLine.isDefined) { - val firstLine = maybeFirstLine.get - val parser = new CsvParser(parsedOptions.asParserSettings) - val columnNames = parser.parseLine(firstLine) - CSVDataSource.checkHeaderColumnNames( +val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine => + val headerChecker = new CSVHeaderChecker( actualSchema, -columnNames, -csvDataset.getClass.getCanonicalName, -parsedOptions.enforceSchema, -sparkSession.sessionState.conf.caseSensitiveAnalysis) +parsedOptions, +source = s"CSV source: ${csvDataset.getClass.getCanonicalName}") --- End diff -- Makes sense. If that's just `toString`, of course I can fix it here since the change is small although it's orthogonal. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223743548 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala --- @@ -251,7 +125,7 @@ object TextInputCSVDataSource extends CSVDataSource { maybeFirstLine.map(csvParser.parseLine(_)) match { case Some(firstRow) if firstRow != null => val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis -val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions) +val header = CSVUtils.makeSafeHeader(firstRow, caseSensitive, parsedOptions) --- End diff -- Because mostly in this codes use `CSVUtils...` one. I just followed it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223728765 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.csv + +import com.univocity.parsers.csv.CsvParser + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +/** + * Checks that column names in a CSV header and field names in the schema are the same + * by taking into account case sensitivity. + * + * @param schema provided (or inferred) schema to which CSV must conform. + * @param options parsed CSV options. + * @param source name of CSV source that are currently checked. It is used in error messages. + * @param isStartOfFile indicates if the currently processing partition is the start of the file. + * if unknown or not applicable (for instance when the input is a dataset), + * can be omitted. + */ +class CSVHeaderChecker( --- End diff -- It's under execution package which is meant to be private. Since it's accessed in DataFrameReader, it should be `private[sql]` which is removed in SPARK-16964 for this reason. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22675: [SPARK-25347][ML][DOC] Spark datasource for image...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22675#discussion_r223727369 --- Diff: docs/ml-datasource.md --- @@ -0,0 +1,51 @@ +--- +layout: global +title: Data sources +displayTitle: Data sources +--- + +In this section, we introduce how to use data source in ML to load data. +Beside some general data sources "parquat", "csv", "json", "jdbc", we also provide some specific data source for ML. + +**Table of Contents** + +* This will become a table of contents (this text will be scraped). +{:toc} + +## Image data source + +This image data source is used to load libsvm data files from directory. + + + +[`ImageDataSource`](api/scala/index.html#org.apache.spark.ml.source.image.ImageDataSource) +implements Spark SQL data source API for loading image data as DataFrame. +The loaded DataFrame has one StructType column: "image". containing image data stored as image schema. + +{% highlight scala %} +scala> spark.read.format("image").load("data/mllib/images/origin") +res1: org.apache.spark.sql.DataFrame = [image: struct] +{% endhighlight %} + + + +[`ImageDataSource`](api/java/org/apache/spark/ml/source/image/ImageDataSource.html) --- End diff -- cc @cloud-fan and @gatorsmile, am I missing something? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22675: [SPARK-25347][ML][DOC] Spark datasource for image...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22675#discussion_r223725196 --- Diff: docs/ml-datasource.md --- @@ -0,0 +1,51 @@ +--- +layout: global +title: Data sources +displayTitle: Data sources +--- + +In this section, we introduce how to use data source in ML to load data. +Beside some general data sources "parquat", "csv", "json", "jdbc", we also provide some specific data source for ML. + +**Table of Contents** + +* This will become a table of contents (this text will be scraped). +{:toc} + +## Image data source + +This image data source is used to load libsvm data files from directory. + + + +[`ImageDataSource`](api/scala/index.html#org.apache.spark.ml.source.image.ImageDataSource) +implements Spark SQL data source API for loading image data as DataFrame. +The loaded DataFrame has one StructType column: "image". containing image data stored as image schema. + +{% highlight scala %} +scala> spark.read.format("image").load("data/mllib/images/origin") +res1: org.apache.spark.sql.DataFrame = [image: struct] +{% endhighlight %} + + + +[`ImageDataSource`](api/java/org/apache/spark/ml/source/image/ImageDataSource.html) --- End diff -- I meant (external) Avro was merged into `external/...` in Apache Spark as a separate module due to the reason above. Image data source is merged into Spark's main code rather then a separate module. I don't object to bring an external into Apache Spark and I don't doubt you guys's judgement; however, was wondering why this exists in Spark's main code whereas the ideal approach is to put them `external/...`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r223696096 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala --- @@ -547,4 +548,27 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { Map("pretty" -> "true"))), Seq(Row(expected))) } + + test("from_json invalid json - check modes") { +val df = Seq("""{"a" 1}""", """{"a": 2}""").toDS() +val schema = new StructType().add("a", IntegerType) + +checkAnswer( + df.select(from_json($"value", schema, Map("mode" -> "PERMISSIVE"))), --- End diff -- @MaxGekk, can we add a test with manual malformed column in the schema? For instance, https://github.com/apache/spark/blob/a8a1ac01c4732f8a738b973c8486514cd88bf99b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala#L1125-L1133 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22656: [SPARK-25669][SQL] Check CSV header only when it exists
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22656 Merged to master and branch-2.4. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223594430 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.csv + +import com.univocity.parsers.csv.CsvParser + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +/** + * Checks that column names in a CSV header and field names in the schema are the same + * by taking into account case sensitivity. + * + * @param schema provided (or inferred) schema to which CSV must conform. + * @param options parsed CSV options. + * @param source name of CSV source that are currently checked. It is used in error messages. + * @param isStartOfFile indicates if the currently processing partition is the start of the file. + * if unknown or not applicable (for instance when the input is a dataset), + * can be omitted. + */ +class CSVHeaderChecker( +schema: StructType, +options: CSVOptions, +source: String, +isStartOfFile: Boolean = false) extends Logging { + + // Indicates if it is set to `false`, comparison of column names and schema field + // names is not case sensitive. + private val caseSensitive = SQLConf.get.caseSensitiveAnalysis + + // Indicates if it is `true`, column names are ignored otherwise the CSV column + // names are checked for conformance to the schema. In the case if + // the column name don't conform to the schema, an exception is thrown. + private val enforceSchema = options.enforceSchema + + /** + * Checks that column names in a CSV header and field names in the schema are the same + * by taking into account case sensitivity. + * + * @param columnNames names of CSV columns that must be checked against to the schema. + */ + private def checkHeaderColumnNames(columnNames: Array[String]): Unit = { +if (columnNames != null) { + val fieldNames = schema.map(_.name).toIndexedSeq + val (headerLen, schemaSize) = (columnNames.size, fieldNames.length) + var errorMessage: Option[String] = None + + if (headerLen == schemaSize) { +var i = 0 +while (errorMessage.isEmpty && i < headerLen) { + var (nameInSchema, nameInHeader) = (fieldNames(i), columnNames(i)) + if (!caseSensitive) { +// scalastyle:off caselocale +nameInSchema = nameInSchema.toLowerCase +nameInHeader = nameInHeader.toLowerCase +// scalastyle:on caselocale + } + if (nameInHeader != nameInSchema) { +errorMessage = Some( + s"""|CSV header does not conform to the schema. + | Header: ${columnNames.mkString(", ")} + | Schema: ${fieldNames.mkString(", ")} + |Expected: ${fieldNames(i)} but found: ${columnNames(i)} + |$source""".stripMargin) --- End diff -- only this diff. Previously it was ``` |CSV file: $fileName""".stripMargin) ``` which ends up with producing the class of source here. See (https://github.com/apache/spark/pull/22676/files#diff-f70bda59304588cc3abfa3a9840653f4R512) This is only the diff in this method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223594011 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.csv + +import com.univocity.parsers.csv.CsvParser + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +/** + * Checks that column names in a CSV header and field names in the schema are the same + * by taking into account case sensitivity. + * + * @param schema provided (or inferred) schema to which CSV must conform. + * @param options parsed CSV options. + * @param source name of CSV source that are currently checked. It is used in error messages. + * @param isStartOfFile indicates if the currently processing partition is the start of the file. + * if unknown or not applicable (for instance when the input is a dataset), + * can be omitted. + */ +class CSVHeaderChecker( +schema: StructType, +options: CSVOptions, +source: String, +isStartOfFile: Boolean = false) extends Logging { + + // Indicates if it is set to `false`, comparison of column names and schema field + // names is not case sensitive. + private val caseSensitive = SQLConf.get.caseSensitiveAnalysis + + // Indicates if it is `true`, column names are ignored otherwise the CSV column + // names are checked for conformance to the schema. In the case if + // the column name don't conform to the schema, an exception is thrown. + private val enforceSchema = options.enforceSchema + + /** + * Checks that column names in a CSV header and field names in the schema are the same + * by taking into account case sensitivity. + * + * @param columnNames names of CSV columns that must be checked against to the schema. + */ + private def checkHeaderColumnNames(columnNames: Array[String]): Unit = { --- End diff -- It's moved as was except the parameters at its signature. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223593838 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala --- @@ -273,44 +274,47 @@ private[csv] object UnivocityParser { inputStream: InputStream, shouldDropHeader: Boolean, tokenizer: CsvParser): Iterator[Array[String]] = { -convertStream(inputStream, shouldDropHeader, tokenizer)(tokens => tokens) +val handleHeader: () => Unit = + () => if (shouldDropHeader) tokenizer.parseNext + +convertStream(inputStream, tokenizer, handleHeader)(tokens => tokens) } /** * Parses a stream that contains CSV strings and turns it into an iterator of rows. */ def parseStream( inputStream: InputStream, - shouldDropHeader: Boolean, parser: UnivocityParser, - schema: StructType, - checkHeader: Array[String] => Unit): Iterator[InternalRow] = { + headerChecker: CSVHeaderChecker, + schema: StructType): Iterator[InternalRow] = { val tokenizer = parser.tokenizer val safeParser = new FailureSafeParser[Array[String]]( input => Seq(parser.convert(input)), parser.options.parseMode, schema, parser.options.columnNameOfCorruptRecord, parser.options.multiLine) -convertStream(inputStream, shouldDropHeader, tokenizer, checkHeader) { tokens => + +val handleHeader: () => Unit = + () => headerChecker.checkHeaderColumnNames(tokenizer) --- End diff -- This matches the code structure with `parseStream` and `parseIterator` which are used in multimode and non-multimode. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223593894 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala --- @@ -90,6 +89,49 @@ object CSVUtils { None } } + + /** + * Generates a header from the given row which is null-safe and duplicate-safe. + */ + def makeSafeHeader( --- End diff -- It's moved as was. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223593701 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala --- @@ -273,44 +274,47 @@ private[csv] object UnivocityParser { inputStream: InputStream, shouldDropHeader: Boolean, tokenizer: CsvParser): Iterator[Array[String]] = { -convertStream(inputStream, shouldDropHeader, tokenizer)(tokens => tokens) +val handleHeader: () => Unit = + () => if (shouldDropHeader) tokenizer.parseNext --- End diff -- This is used in schema inference path, where we don't check header. Here only it drops the header. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22676: [SPARK-25684][SQL] Organize header related codes in CSV ...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22676 cc @cloud-fan and @MaxGekk --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
GitHub user HyukjinKwon opened a pull request: https://github.com/apache/spark/pull/22676 [SPARK-25684][SQL] Organize header related codes in CSV datasource ## What changes were proposed in this pull request? 1. Move `CSVDataSource.makeSafeHeader` to `CSVUtils.makeSafeHeader` (as is). Rationale: - Historically and at the first place of refactoring (which I did), I intended to put all CSV specific handling (like options), filtering, extracting header, etc. - See `JsonDataSource`. Now `CSVDataSource` is quite consistent with `JsonDataSource`. Since CSV's code path is quite complicated, we might better match them as possible as we can. 2. Move `CSVDataSource.checkHeaderColumnNames` to `CSVHeaderChecker.checkHeaderColumnNames` (as is). Rationale: - Similar reasons above with 1. 3. Put `enforceSchema` logics into `CSVHeaderChecker`. - The checking header and column pruning stuff were added (per https://github.com/apache/spark/pull/20894 and https://github.com/apache/spark/pull/21296) but some of codes such as https://github.com/apache/spark/pull/21296 are duplicated - Also, checking header code is basically here and there. We better put them in a single place, which is quite error-prone. See (https://github.com/apache/spark/pull/22656). ## How was this patch tested? Existing tests should cover this. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HyukjinKwon/spark refactoring-csv Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22676.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22676 commit 56906680ab7d5d63be04bac2c3a19bb52baa3025 Author: hyukjinkwon Date: 2018-10-09T07:26:08Z Organize header related codes in CSV datasource --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22656: [SPARK-25669][SQL] Check CSV header only when it ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22656#discussion_r223566522 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -505,7 +505,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val actualSchema = StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) -val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine => +val linesWithoutHeader = if (parsedOptions.headerFlag && maybeFirstLine.isDefined) { --- End diff -- LGTM but it really needs some refactoring. Let me give a shot --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22675: [SPARK-25347][ML][DOC] Spark datasource for image...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22675#discussion_r223552437 --- Diff: docs/ml-datasource.md --- @@ -0,0 +1,51 @@ +--- +layout: global +title: Data sources +displayTitle: Data sources +--- + +In this section, we introduce how to use data source in ML to load data. +Beside some general data sources "parquat", "csv", "json", "jdbc", we also provide some specific data source for ML. + +**Table of Contents** + +* This will become a table of contents (this text will be scraped). +{:toc} + +## Image data source + +This image data source is used to load libsvm data files from directory. + + + +[`ImageDataSource`](api/scala/index.html#org.apache.spark.ml.source.image.ImageDataSource) +implements Spark SQL data source API for loading image data as DataFrame. +The loaded DataFrame has one StructType column: "image". containing image data stored as image schema. + +{% highlight scala %} +scala> spark.read.format("image").load("data/mllib/images/origin") +res1: org.apache.spark.sql.DataFrame = [image: struct] +{% endhighlight %} + + + +[`ImageDataSource`](api/java/org/apache/spark/ml/source/image/ImageDataSource.html) --- End diff -- cc @mengxr as well --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22675: [SPARK-25347][ML][DOC] Spark datasource for image...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22675#discussion_r223552386 --- Diff: docs/ml-datasource.md --- @@ -0,0 +1,51 @@ +--- +layout: global +title: Data sources +displayTitle: Data sources +--- + +In this section, we introduce how to use data source in ML to load data. +Beside some general data sources "parquat", "csv", "json", "jdbc", we also provide some specific data source for ML. + +**Table of Contents** + +* This will become a table of contents (this text will be scraped). +{:toc} + +## Image data source + +This image data source is used to load libsvm data files from directory. + + + +[`ImageDataSource`](api/scala/index.html#org.apache.spark.ml.source.image.ImageDataSource) +implements Spark SQL data source API for loading image data as DataFrame. +The loaded DataFrame has one StructType column: "image". containing image data stored as image schema. + +{% highlight scala %} +scala> spark.read.format("image").load("data/mllib/images/origin") +res1: org.apache.spark.sql.DataFrame = [image: struct] +{% endhighlight %} + + + +[`ImageDataSource`](api/java/org/apache/spark/ml/source/image/ImageDataSource.html) --- End diff -- Out of curiosity, why did we put the image source inside of Spark, rather then a separate module? (see also https://github.com/apache/spark/pull/21742#discussion_r201552008). Avro was put as a separate module. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22653: [SPARK-25659][PYTHON][TEST] Test type inference s...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22653#discussion_r223552056 --- Diff: python/pyspark/sql/tests.py --- @@ -1149,6 +1149,75 @@ def test_infer_schema(self): result = self.spark.sql("SELECT l[0].a from test2 where d['key'].d = '2'") self.assertEqual(1, result.head()[0]) +def test_infer_schema_specification(self): +from decimal import Decimal + +class A(object): +def __init__(self): +self.a = 1 + +data = [ +True, +1, +"a", +u"a", --- End diff -- Fair point. Will change when I fix some codes around here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22615: [SPARK-25016][BUILD][CORE] Remove support for Hadoop 2.6
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22615 I want to see the configurations .. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r223550790 --- Diff: docs/sql-programming-guide.md --- @@ -1890,6 +1890,10 @@ working with timestamps in `pandas_udf`s to get the best performance, see # Migration Guide +## Upgrading From Spark SQL 2.4 to 3.0 + + - Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects are considered as invalid and converted to `null` if specified schema is `StructType`. Since Spark 3.0, the input is considered as a valid JSON array and only its first element is parsed if it conforms to the specified `StructType`. --- End diff -- Last option sounds better to me but can we fill the corrupt row when the corrupt field name is specified in the schema? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22653: [SPARK-25659][PYTHON][TEST] Test type inference specific...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22653 Merged to master. Thank you @BryanCutler. Let me try to make sure I take further actions for related items. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22635: [SPARK-25591][PySpark][SQL] Avoid overwriting deserializ...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22635 Yea, same issue exists in Pandas UDFs too (quickly double checked). This PR fixes it. That code path is rather one same place FYI. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22653: [SPARK-25659][PYTHON][TEST] Test type inference specific...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22653 Let me open a JIRA after taking some more further looks. I checked some code places and looks it's not easy to support `None` for instance. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22653: [SPARK-25659][PYTHON][TEST] Test type inference s...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22653#discussion_r223520528 --- Diff: python/pyspark/sql/tests.py --- @@ -1149,6 +1149,75 @@ def test_infer_schema(self): result = self.spark.sql("SELECT l[0].a from test2 where d['key'].d = '2'") self.assertEqual(1, result.head()[0]) +def test_infer_schema_specification(self): +from decimal import Decimal + +class A(object): +def __init__(self): +self.a = 1 + +data = [ +True, +1, +"a", +u"a", +datetime.date(1970, 1, 1), +datetime.datetime(1970, 1, 1, 0, 0), +1.0, +array.array("d", [1]), +[1], +(1, ), +{"a": 1}, +bytearray(1), +Decimal(1), +Row(a=1), +Row("a")(1), +A(), --- End diff -- Yea, it uses `__dict__` attribute.. looks that's not possible in UDFs .. possibly an issue. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22545: [SPARK-25525][SQL][PYSPARK] Do not update conf fo...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22545#discussion_r223408456 --- Diff: python/pyspark/sql/session.py --- @@ -156,7 +156,7 @@ def getOrCreate(self): default. >>> s1 = SparkSession.builder.config("k1", "v1").getOrCreate() ->>> s1.conf.get("k1") == s1.sparkContext.getConf().get("k1") == "v1" +>>> s1.conf.get("k1") == "v1" --- End diff -- In that case, we might have to put the behaviour changes by https://github.com/apache/spark/pull/18536 together to the migration guide as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22657: [SPARK-25670][TEST] Reduce number of tested timezones in...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22657 I agree. We don't test `commons-lang3` library here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22631: [SPARK-25605][TESTS] Run cast string to timestamp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22631#discussion_r223286771 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala --- @@ -110,7 +112,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper { } test("cast string to timestamp") { -for (tz <- ALL_TIMEZONES) { +for (tz <- Random.shuffle(ALL_TIMEZONES).take(50)) { --- End diff -- I mean some tests like with randomized input, let's say, integer range input are fine in common sense but this case is different, isn't it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22631: [SPARK-25605][TESTS] Run cast string to timestamp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22631#discussion_r223285813 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala --- @@ -110,7 +112,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper { } test("cast string to timestamp") { -for (tz <- ALL_TIMEZONES) { +for (tz <- Random.shuffle(ALL_TIMEZONES).take(50)) { --- End diff -- I think tests need to be deterministic in general as well. In this particular case ideally, we should categorize timezones and test fixed set. For instance, timezone with DST, without DST, and some exceptions such as, for instance, see this particular case which Python 3.6 addressed lately (https://github.com/python/cpython/blob/e42b705188271da108de42b55d9344642170aa2b/Lib/datetime.py#L1572-L1574), IMHO. Of course, this approach requires a lot of investigations and overheads. So, as an alternative, I would incline to go for Sean's approach (https://github.com/apache/spark/pull/22631/files#r223224573) for this particular case. For randomness, I think primarily we should have first deterministic set of tests. Maybe we could additionally have some set of randomized input to cover some cases we haven't foreseen but that's secondary. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22655: [SPARK-25666][PYTHON] Internally document type conversio...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22655 Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22655: [SPARK-25666][PYTHON] Internally document type conversio...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22655 I am getting this in. This is an ongoing effort and it just documents them internally for now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22669: [SPARK-25677] [Doc] spark.io.compression.codec = org.apa...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22669 Merged to master and branch-2.4. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22635: [SPARK-25591][PySpark][SQL] Avoid overwriting deserializ...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22635 Merged to master and branch-2.4. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22667: [SPARK-25673][BUILD] Remove Travis CI which enables Java...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22667 Merged to master and branch-2.4. Thanks @srowen and @felixcheung. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22669: [SPARK-25677] [Doc] spark.io.compression.codec = org.apa...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22669 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22655: [WIP][SPARK-25666][PYTHON] Internally document type conv...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22655 Let me make this table for Pandas UDF too and then open another JIRA (or mailing list) to discuss about this further. I need more investigations to propose the desired behaviour targeting 3.0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22665: add [openjdk11] to Travis build matrix
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22665 @sullis, this file will be obsolete per https://github.com/apache/spark/pull/22667. This shall be closed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22667: [SPARK-25673][BUILD] Remove Travis CI which enabl...
GitHub user HyukjinKwon opened a pull request: https://github.com/apache/spark/pull/22667 [SPARK-25673][BUILD] Remove Travis CI which enables Java lint check ## What changes were proposed in this pull request? https://github.com/apache/spark/pull/12980 added Travis CI file mainly for linter because we disabled Java lint check in Jenkins. It's enabled as of https://github.com/apache/spark/pull/21399 and now SBT runs it. Looks we can now remove the file added before. ## How was this patch tested? N/A You can merge this pull request into a Git repository by running: $ git pull https://github.com/HyukjinKwon/spark SPARK-25673 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22667.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22667 commit 7a535db83ba4b47c489166fb2b149fbb32b0aba4 Author: hyukjinkwon Date: 2018-10-08T02:00:51Z Remove Travis CI which enables Java lint check --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22667: [SPARK-25673][BUILD] Remove Travis CI which enables Java...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22667 cc @srowen and @dongjoon-hyun --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22659: [SPARK-25623][TEST] Reduce test time of LogisticRegressi...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22659 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22653: [SPARK-25659][PYTHON][TEST] Test type inference specific...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22653 @BryanCutler and @viirya would you mind if I ask to take a look please? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22655: [SPARK-25666][PYTHON] Internally document type co...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22655#discussion_r223218608 --- Diff: python/pyspark/sql/functions.py --- @@ -2733,6 +2733,33 @@ def udf(f=None, returnType=StringType()): | 8| JOHN DOE| 22| +--+--++ """ + +# The following table shows most of Python data and SQL type conversions in normal UDFs that +# are not yet visible to the user. Some of behaviors are buggy and might be changed in the near +# future. The table might have to be eventually documented externally. +# Please see SPARK-25666's PR to see the codes in order to generate the table below. +# +# +-+--+--+--+---+--+--++-+--+--+-++--+--+--+-+-+ # noqa +# |SQL Type \ Python Value(Type)|None(NoneType)|True(bool)|1(int)|1(long)|a(str)|a(unicode)| 1970-01-01(date)|1970-01-01 00:00:00(datetime)|1.0(float)|array('i', [1])(array)|[1](list)| (1,)(tuple)|ABC(bytearray)|1(Decimal)|{'a': 1}(dict)|Row(a=1)(Row)|Row(a=1)(Row)| # noqa --- End diff -- Right, one was `Row(a=1)` and the other one was namedtuple approach `Row("a")(1)`. Let me try to update. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22655: [SPARK-25666][PYTHON] Internally document type co...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22655#discussion_r223218561 --- Diff: python/pyspark/sql/functions.py --- @@ -2733,6 +2733,33 @@ def udf(f=None, returnType=StringType()): | 8| JOHN DOE| 22| +--+--++ """ + +# The following table shows most of Python data and SQL type conversions in normal UDFs that +# are not yet visible to the user. Some of behaviors are buggy and might be changed in the near +# future. The table might have to be eventually documented externally. +# Please see SPARK-25666's PR to see the codes in order to generate the table below. +# +# +-+--+--+--+---+--+--++-+--+--+-++--+--+--+-+-+ # noqa +# |SQL Type \ Python Value(Type)|None(NoneType)|True(bool)|1(int)|1(long)|a(str)|a(unicode)| 1970-01-01(date)|1970-01-01 00:00:00(datetime)|1.0(float)|array('i', [1])(array)|[1](list)| (1,)(tuple)|ABC(bytearray)|1(Decimal)|{'a': 1}(dict)|Row(a=1)(Row)|Row(a=1)(Row)| # noqa +# +-+--+--+--+---+--+--++-+--+--+-++--+--+--+-+-+ # noqa +# | null| None| None| None| None| None| None|None| None| None| None| None|None| None| None| None|X|X| # noqa +# | boolean| None| True| None| None| None| None|None| None| None| None| None|None| None| None| None|X|X| # noqa +# | tinyint| None| None| 1| 1| None| None|None| None| None| None| None|None| None| None| None|X|X| # noqa +# | smallint| None| None| 1| 1| None| None|None| None| None| None| None|None| None| None| None|X|X| # noqa +# | int| None| None| 1| 1| None| None|None| None| None| None| None|None| None| None| None|X|X| # noqa +# | bigint| None| None| 1| 1| None| None|None| None| None| None| None|None| None| None| None|X|X| # noqa +# | string| None| true| 1| 1| a| a|java.util.Gregori...| java.util.Gregori...| 1.0| [I@7f1970e1| [1]|[Ljava.lang.Objec...| [B@284838a9| 1| {a=1}|X|X| # noqa --- End diff -- Hm .. I see the type is not clear here. Let me think about this a bit more. `[B@284838a9` is a quite buggy behaviour - we should fix. So I was thinking of documenting internally since we already spent much time to figure out how it works for each case individually (at https://github.com/apache/spark/pull/20163). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22635: [SPARK-25591][PySpark][SQL] Avoid overwriting deserializ...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22635 Nice catch @viirya LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22635: [SPARK-25591][PySpark][SQL] Avoid overwriting des...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22635#discussion_r223218449 --- Diff: python/pyspark/accumulators.py --- @@ -109,10 +109,14 @@ def _deserialize_accumulator(aid, zero_value, accum_param): from pyspark.accumulators import _accumulatorRegistry -accum = Accumulator(aid, zero_value, accum_param) -accum._deserialized = True -_accumulatorRegistry[aid] = accum -return accum +# If this certain accumulator was deserialized, don't overwrite it. +if aid in _accumulatorRegistry: --- End diff -- Ah, so the problem is this accumulator is de/serialized multiple times and `_deserialize_accumulator` modifies the global status multiple times. I see. LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22635: [SPARK-25591][PySpark][SQL] Avoid overwriting des...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22635#discussion_r223218361 --- Diff: python/pyspark/sql/tests.py --- @@ -3603,6 +3603,31 @@ def test_repr_behaviors(self): self.assertEquals(None, df._repr_html_()) self.assertEquals(expected, df.__repr__()) +# SPARK-25591 +def test_same_accumulator_in_udfs(self): +from pyspark.sql.functions import udf + +data_schema = StructType([StructField("a", DoubleType(), True), + StructField("b", DoubleType(), True)]) +data = self.spark.createDataFrame([[1.0, 2.0]], schema=data_schema) + +test_accum = self.sc.accumulator(0.0) + +def first_udf(x): +test_accum.add(1.0) +return x + +def second_udf(x): +test_accum.add(100.0) +return x + +func_udf = udf(first_udf, DoubleType()) +func_udf2 = udf(second_udf, DoubleType()) +data = data.withColumn("out1", func_udf(data["a"])) +data = data.withColumn("out2", func_udf2(data["b"])) +data.collect() +self.assertEqual(test_accum.value, 101) --- End diff -- @viirya, can we just use int for data and accumulator as well in this test case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22610: [SPARK-25461][PySpark][SQL] Add document for mismatch be...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22610 Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22610: [SPARK-25461][PySpark][SQL] Add document for mism...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22610#discussion_r223217249 --- Diff: python/pyspark/sql/functions.py --- @@ -2909,6 +2909,12 @@ def pandas_udf(f=None, returnType=None, functionType=None): can fail on special rows, the workaround is to incorporate the condition into the functions. .. note:: The user-defined functions do not take keyword arguments on the calling side. + +.. note:: The data type of returned `pandas.Series` from the user-defined functions should be +matched with defined returnType (see :meth:`types.to_arrow_type` and +:meth:`types.from_arrow_type`). When there is mismatch between them, Spark might do +conversion on returned data. The conversion is not guaranteed to be correct and results +should be checked for accuracy by users. --- End diff -- I am merging this since this describes the current status but let's make it clear and try to get rid of this note within 3.0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22647: [SPARK-25655] [BUILD] Add -Pspark-ganglia-lgpl to...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22647#discussion_r223186506 --- Diff: external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala --- @@ -64,11 +64,12 @@ class GangliaSink(val property: Properties, val registry: MetricRegistry, val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL) val dmax = propertyToOption(GANGLIA_KEY_DMAX).map(_.toInt).getOrElse(GANGLIA_DEFAULT_DMAX) val mode: UDPAddressingMode = propertyToOption(GANGLIA_KEY_MODE) -.map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE) +.map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase(Locale.Root))) --- End diff -- That's odd. Let me check soon within few days. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22655: [SPARK-25666][PYTHON] Internally document type conversio...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22655 cc @cloud-fan, @viirya and @BryanCutler, WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22655: [SPARK-25666][PYTHON] Internally document type co...
GitHub user HyukjinKwon opened a pull request: https://github.com/apache/spark/pull/22655 [SPARK-25666][PYTHON] Internally document type conversion between Python data and SQL types in normal UDFs ### What changes were proposed in this pull request? We are facing some problems about type conversions between Python data and SQL types in UDFs (Pandas UDFs as well). It's even difficult to identify the problems (see https://github.com/apache/spark/pull/20163 and https://github.com/apache/spark/pull/22610). This PR targets to internally document the type conversion table. Some of them looks buggy and we should fix them. ```python import array import datetime from decimal import Decimal from pyspark.sql import Row from pyspark.sql.types import * from pyspark.sql.functions import udf data = [ None, True, 1, 1L, # Python 2 only "a", u"a", datetime.date(1970, 1, 1), datetime.datetime(1970, 1, 1, 0, 0), 1.0, array.array("i", [1]), [1], (1,), bytearray([65, 66, 67]), Decimal(1), {"a": 1}, Row(a=1), Row("a")(1), ] types = [ NullType(), BooleanType(), ByteType(), ShortType(), IntegerType(), LongType(), StringType(), DateType(), TimestampType(), FloatType(), DoubleType(), ArrayType(IntegerType()), BinaryType(), DecimalType(10, 0), MapType(StringType(), IntegerType()), StructType([StructField("_1", IntegerType())]), ] df = spark.range(1) results = [] for t in types: result = [] for v in data: try: row = df.select(udf(lambda: v, t)()).first() result.append(row[0]) except Exception: result.append("X") results.append([t.simpleString()] + map(str, result)) schema = ["SQL Type \\ Python Value(Type)"] + map(lambda v: "%s(%s)" % (str(v), type(v).__name__), data) strings = spark.createDataFrame(results, schema=schema)._jdf.showString(20, 20, False) print("\n".join(map(lambda line: "# %s # noqa" % line, strings.strip().split("\n" ``` ## How was this patch tested? Manually tested and lint check. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HyukjinKwon/spark SPARK-25666 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22655.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22655 commit 3084be1de3ff58a9258dacfb8d7cf575df3fb3c9 Author: hyukjinkwon Date: 2018-10-06T10:59:46Z Internally document type conversion between Python data and SQL types in UDFs --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22653: [SPARK-25659][PYTHON] Test type inference specifi...
GitHub user HyukjinKwon opened a pull request: https://github.com/apache/spark/pull/22653 [SPARK-25659][PYTHON] Test type inference specification for createDataFrame in PySpark ## What changes were proposed in this pull request? This PR proposes to specify type inference and simple e2e tests. Looks we are not cleanly testing those logics. For instance, see https://github.com/apache/spark/blob/08c76b5d39127ae207d9d1fff99c2551e6ce2581/python/pyspark/sql/types.py#L894-L905 Looks we intended to support datetime.time and None for type inference too but it does not work: ``` >>> spark.createDataFrame([[datetime.time()]]) Traceback (most recent call last): File "", line 1, in File "/.../spark/python/pyspark/sql/session.py", line 751, in createDataFrame rdd, schema = self._createFromLocal(map(prepare, data), schema) File "/.../spark/python/pyspark/sql/session.py", line 432, in _createFromLocal data = [schema.toInternal(row) for row in data] File "/.../spark/python/pyspark/sql/types.py", line 604, in toInternal for f, v, c in zip(self.fields, obj, self._needConversion)) File "/.../spark/python/pyspark/sql/types.py", line 604, in for f, v, c in zip(self.fields, obj, self._needConversion)) File "/.../spark/python/pyspark/sql/types.py", line 442, in toInternal return self.dataType.toInternal(obj) File "/.../spark/python/pyspark/sql/types.py", line 193, in toInternal else time.mktime(dt.timetuple())) AttributeError: 'datetime.time' object has no attribute 'timetuple' >>> spark.createDataFrame([[None]]) Traceback (most recent call last): File "", line 1, in File "/.../spark/python/pyspark/sql/session.py", line 751, in createDataFrame rdd, schema = self._createFromLocal(map(prepare, data), schema) File "/.../spark/python/pyspark/sql/session.py", line 419, in _createFromLocal struct = self._inferSchemaFromList(data, names=schema) File "/.../python/pyspark/sql/session.py", line 353, in _inferSchemaFromList raise ValueError("Some of types cannot be determined after inferring") ValueError: Some of types cannot be determined after inferring ``` ## How was this patch tested? Manual tests and unit tests were added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HyukjinKwon/spark SPARK-25659 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22653.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22653 commit 8fddb9dca26ce36be1f7eaf0d356bf78070486f9 Author: hyukjinkwon Date: 2018-10-06T09:26:04Z Test type inference specification for createDataFrame in PySpark --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22640: [SPARK-25621][SPARK-25622][TEST] Reduce test time of Buc...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22640 Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r223173806 --- Diff: python/pyspark/sql/session.py --- @@ -252,6 +255,20 @@ def newSession(self): """ return self.__class__(self._sc, self._jsparkSession.newSession()) +@classmethod +@since(2.5) +def getActiveSession(cls): +""" +Returns the active SparkSession for the current thread, returned by the builder. +>>> s = SparkSession.getActiveSession() +>>> l = [('Alice', 1)] +>>> rdd = s.sparkContext.parallelize(l) +>>> df = s.createDataFrame(rdd, ['name', 'age']) +>>> df.select("age").collect() +[Row(age=1)] +""" +return cls._activeSession --- End diff -- Yea, it should look like that --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22619: [SPARK-25600][SQL][MINOR] Make use of TypeCoercion.findT...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22619 Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22448: [SPARK-25417][SQL] Improve findTightestCommonType to coe...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22448 @dilipbiswal, can you file another JIRA instead of SPARK-25417 specifically for type coercion? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22646: [SPARK-25654][SQL] Support for nested JavaBean arrays, l...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22646 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22649: [SPARK-25644][SS][FOLLOWUP][BUILD] Fix Scala 2.12 build ...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22649 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22610#discussion_r223173637 --- Diff: python/pyspark/sql/functions.py --- @@ -2909,6 +2909,11 @@ def pandas_udf(f=None, returnType=None, functionType=None): can fail on special rows, the workaround is to incorporate the condition into the functions. .. note:: The user-defined functions do not take keyword arguments on the calling side. + +.. note:: The data type of returned `pandas.Series` from the user-defined functions should be +matched with defined returnType. When there is mismatch between them, it is not guaranteed +that the conversion by SparkSQL during serialization is correct at all and users might get --- End diff -- > an attempt will be made to cast the data and results should be checked for accuracy." it sounds like the casting is intentional. I think the casting logic is not that clear as far as I can tell, comparing SQL casting logic. Can we leave this not guaranteed for now and document the casting logic here instead? Does Arrow have some kind of documentation for type conversion BTW? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22227: [SPARK-25202] [SQL] Implements split with limit sql func...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/7 Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22647: [SPARK-25655] [BUILD] Add -Pspark-ganglia-lgpl to the sc...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22647 Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22636: [SPARK-25629][TEST] Reduce ParquetFilterSuite: filter pu...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22636 Yea, it is not obvious and only few seconds - might not be so worth. But looks improvement because it fixes the test cases to test what the previous PR targeted. Wouldn't it be better just to go ahead rather then close this since the PR is already open? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22647: [SPARK-25655] [BUILD] Add -Pspark-ganglia-lgpl to the sc...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22647 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22640: [SPARK-25621][SPARK-25622][TEST] Reduce test time of Buc...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22640 cc @cloud-fan too for sure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22640: [SPARK-25621][SPARK-25622][TEST] Reduce test time...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22640#discussion_r222975187 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala --- @@ -66,7 +66,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { .bucketBy(8, "j", "k") .saveAsTable("bucketed_table") - for (i <- 0 until 5) { --- End diff -- Oh haha I was struggling why it was 5 and 13 and looks it's in `df` :-) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r222971397 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala --- @@ -15,50 +15,51 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.datasources +package org.apache.spark.sql.catalyst.util import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.unsafe.types.UTF8String class FailureSafeParser[IN]( rawParser: IN => Seq[InternalRow], mode: ParseMode, -schema: StructType, +dataType: DataType, columnNameOfCorruptRecord: String, isMultiLine: Boolean) { - - private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) - private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) - private val resultRow = new GenericInternalRow(schema.length) - private val nullResult = new GenericInternalRow(schema.length) - // This function takes 2 parameters: an optional partial result, and the bad record. If the given // schema doesn't contain a field for corrupted record, we just return the partial result or a // row with all fields null. If the given schema contains a field for corrupted record, we will // set the bad record to this field, and set other fields according to the partial result or null. - private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = { -if (corruptFieldIndex.isDefined) { - (row, badRecord) => { -var i = 0 -while (i < actualSchema.length) { - val from = actualSchema(i) - resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull - i += 1 + private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = dataType match { +case struct: StructType => + val corruptFieldIndex = struct.getFieldIndex(columnNameOfCorruptRecord) --- End diff -- Do you mind if I ask to make a private function for this one (L38 to L54) and define this around here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r222959106 --- Diff: python/pyspark/sql/session.py --- @@ -252,6 +255,20 @@ def newSession(self): """ return self.__class__(self._sc, self._jsparkSession.newSession()) +@classmethod +@since(2.5) --- End diff -- Let's do this to 3.0. Per https://github.com/apache/spark/commit/9bf397c0e45cb161f3f12f09bd2bf14ff96dc823, looks we are going ahead for 3.0 now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r222958780 --- Diff: python/pyspark/sql/session.py --- @@ -252,6 +255,20 @@ def newSession(self): """ return self.__class__(self._sc, self._jsparkSession.newSession()) +@classmethod +@since(2.5) +def getActiveSession(cls): +""" +Returns the active SparkSession for the current thread, returned by the builder. +>>> s = SparkSession.getActiveSession() +>>> l = [('Alice', 1)] +>>> rdd = s.sparkContext.parallelize(l) +>>> df = s.createDataFrame(rdd, ['name', 'age']) +>>> df.select("age").collect() +[Row(age=1)] +""" +return cls._activeSession --- End diff -- Yup. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21596: [SPARK-24601] Update Jackson to 2.9.6
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21596 Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStreamWrit...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22633 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22227: [SPARK-25202] [SQL] Implements split with limit sql func...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/7 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r222926650 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala --- @@ -51,6 +56,8 @@ object ParseMode extends Logging { case PermissiveMode.name => PermissiveMode case DropMalformedMode.name => DropMalformedMode case FailFastMode.name => FailFastMode +case NullMalformedMode.name => NullMalformedMode --- End diff -- I suggested to keep the previous behaviour only because I thought we're in transition to 2.5. Since we are going ahead for 3.0, I think I am okay without keeping it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22635: [SPARK-25591][PySpark][SQL] Avoid overwriting deserializ...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22635 Thanks for cc'ing me. Will take a look this week. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStreamWrit...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22633 Looks fine to me --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222894056 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): --- End diff -- `foreachBatchFunction` -> `foreach_batch_function` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222894110 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): --- End diff -- `epochId` -> `epoch_id` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222893790 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): + # Transform and write batchDF + pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() +{% endhighlight %} + + + +R is not yet supported. + + + +With foreachBatch, you can do the following. + +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, + but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch + data writers on the output of each micro-batch. +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, + then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can + cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, + you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. + +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.cache() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.uncache() +} + +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported + in streaming DataFrames because Spark does not support generating incremental plans in those cases. + Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself. + +**Note:** +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the + batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee. +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the + micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead. + + +## Foreach +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or +continuous processing mode), then you can express you custom writer logic using `foreach`. +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`. +Since Spark 2.4, `foreach` is available in Scala, Java and Python. + + + + +In Scala, you have to extend
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222894028 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): + # Transform and write batchDF --- End diff -- 4 space indentation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222893841 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): + # Transform and write batchDF + pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() +{% endhighlight %} + + + +R is not yet supported. + + + +With foreachBatch, you can do the following. + +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, + but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch + data writers on the output of each micro-batch. +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, + then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can + cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, + you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. + +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.cache() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.uncache() +} + +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported + in streaming DataFrames because Spark does not support generating incremental plans in those cases. + Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself. + +**Note:** +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the + batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee. +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the + micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead. + + +## Foreach +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or +continuous processing mode), then you can express you custom writer logic using `foreach`. +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`. +Since Spark 2.4, `foreach` is available in Scala, Java and Python. + + + + +In Scala, you have to extend
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222893720 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): + # Transform and write batchDF + pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() +{% endhighlight %} + + + +R is not yet supported. + + + +With foreachBatch, you can do the following. + +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, + but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch + data writers on the output of each micro-batch. +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, + then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can + cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, + you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. + +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.cache() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.uncache() +} + +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported + in streaming DataFrames because Spark does not support generating incremental plans in those cases. + Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself. + +**Note:** +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the + batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee. +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the + micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead. + + +## Foreach +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or +continuous processing mode), then you can express you custom writer logic using `foreach`. +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`. +Since Spark 2.4, `foreach` is available in Scala, Java and Python. + + + + +In Scala, you have to extend
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222893572 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): + # Transform and write batchDF + pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() +{% endhighlight %} + + + +R is not yet supported. + + + +With foreachBatch, you can do the following. + +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, + but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch + data writers on the output of each micro-batch. +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, + then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can + cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, + you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. + +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.cache() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.uncache() +} + +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported + in streaming DataFrames because Spark does not support generating incremental plans in those cases. + Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself. + +**Note:** +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the + batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee. +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the + micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead. + + +## Foreach +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or +continuous processing mode), then you can express you custom writer logic using `foreach`. +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`. +Since Spark 2.4, `foreach` is available in Scala, Java and Python. + + + + +In Scala, you have to extend
[GitHub] spark pull request #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStr...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22633#discussion_r222893318 --- Diff: sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java --- @@ -0,0 +1,89 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package test.org.apache.spark.sql.streaming; + +import java.io.File; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.api.java.function.VoidFunction2; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.ForeachWriter; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.streaming.StreamingQuery; +import org.apache.spark.sql.test.TestSparkSession; +import org.apache.spark.util.Utils; + +public class JavaDataStreamReaderWriterSuite { + private SparkSession spark; + private String input; + + @Before + public void setUp() { +spark = new TestSparkSession(); +input = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "input").toString(); + } + + @After + public void tearDown() { +Utils.deleteRecursively(new File(input)); +spark.stop(); +spark = null; + } + + @Test + public void testForeachBatchAPI() { +StreamingQuery query = spark +.readStream() +.textFile(input) +.writeStream() +.foreachBatch(new VoidFunction2, Long>() { + @Override + public void call(Dataset v1, Long v2) throws Exception { + } +}) +.start(); +query.stop(); + } + + @Test + public void testForeachAPI() { +StreamingQuery query = spark +.readStream() +.textFile(input) +.writeStream() +.foreach(new ForeachWriter() { + @Override + public boolean open(long partitionId, long epochId) { +return true; + } + + @Override + public void process(String value) { + } --- End diff -- tiny nit: I would just `public void process(String value) { }` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStr...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22633#discussion_r222893114 --- Diff: sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java --- @@ -0,0 +1,89 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package test.org.apache.spark.sql.streaming; + +import java.io.File; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.api.java.function.VoidFunction2; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.ForeachWriter; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.streaming.StreamingQuery; +import org.apache.spark.sql.test.TestSparkSession; +import org.apache.spark.util.Utils; + +public class JavaDataStreamReaderWriterSuite { + private SparkSession spark; + private String input; + + @Before + public void setUp() { +spark = new TestSparkSession(); +input = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "input").toString(); + } + + @After + public void tearDown() { +Utils.deleteRecursively(new File(input)); +spark.stop(); +spark = null; + } + + @Test + public void testForeachBatchAPI() { +StreamingQuery query = spark +.readStream() +.textFile(input) +.writeStream() +.foreachBatch(new VoidFunction2, Long>() { + @Override + public void call(Dataset v1, Long v2) throws Exception { + } +}) +.start(); +query.stop(); + } + + @Test + public void testForeachAPI() { +StreamingQuery query = spark +.readStream() --- End diff -- Nit: 2 space indentation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22619: [SPARK-25600][SQL][MINOR] Make use of TypeCoercion.findT...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22619 Yup. Let me leave this open few more days in case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22637 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22637: [SPARK-25408] Move to mode ideomatic Java8
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22637 mind filling PR description please? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22399 Let me trigger it in the next PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22399 @Fokko, Let's follow the PR format next time BTW, for instance, "How was this patch tested?" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22610#discussion_r222885910 --- Diff: python/pyspark/sql/functions.py --- @@ -2909,6 +2909,11 @@ def pandas_udf(f=None, returnType=None, functionType=None): can fail on special rows, the workaround is to incorporate the condition into the functions. .. note:: The user-defined functions do not take keyword arguments on the calling side. + +.. note:: The data type of returned `pandas.Series` from the user-defined functions should be +matched with defined returnType. When there is mismatch between them, it is not guaranteed +that the conversion by SparkSQL during serialization is correct at all and users might get --- End diff -- maybe I am concerning too much .. but how about just say .. ``` ... defined returnType (see :meth:`types.to_arrow_type` and :meth:`types.from_arrow_type`). When there is mismatch between them, the conversion is not guaranteed. ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22610#discussion_r222885267 --- Diff: python/pyspark/worker.py --- @@ -84,13 +84,36 @@ def wrap_scalar_pandas_udf(f, return_type): arrow_return_type = to_arrow_type(return_type) def verify_result_length(*a): +import pyarrow as pa result = f(*a) if not hasattr(result, "__len__"): raise TypeError("Return type of the user-defined function should be " "Pandas.Series, but is {}".format(type(result))) if len(result) != len(a[0]): raise RuntimeError("Result vector from pandas_udf was not the required length: " "expected %d, got %d" % (len(a[0]), len(result))) + +# Ensure return type of Pandas.Series matches the arrow return type of the user-defined +# function. Otherwise, we may produce incorrect serialized data. +# Note: for timestamp type, we only need to ensure both types are timestamp because the +# serializer will do conversion. +try: +arrow_type_of_result = pa.from_numpy_dtype(result.dtype) +both_are_timestamp = pa.types.is_timestamp(arrow_type_of_result) and \ +pa.types.is_timestamp(arrow_return_type) +if not both_are_timestamp and arrow_return_type != arrow_type_of_result: +print("WARN: Arrow type %s of return Pandas.Series of the user-defined function's " --- End diff -- Yes .. I support to just fix the doc first here only and make a PR separately later if needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22227: [SPARK-25202] [SQL] Implements split with limit sql func...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/7 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22622: [SPARK-25635][SQL][BUILD] Support selective direct encod...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22622 Which looks now fixed in https://github.com/apache/spark/commit/5ae20cf1a96a33f5de4435fcfb55914d64466525 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22622: [SPARK-25635][SQL][BUILD] Support selective direct encod...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22622 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22626: [SPARK-25638][SQL] Adding new function - to_csv()
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22626 add to whitelist --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22626: [SPARK-25638][SQL] Adding new function - to_csv()
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22626 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21596: [SPARK-24601] Update Jackson to 2.9.6
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21596 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22622: [SPARK-25635][SQL][BUILD] Support selective direc...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22622#discussion_r222535553 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala --- @@ -115,6 +116,71 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { } } + protected def testSelectiveDictionaryEncoding(isSelective: Boolean) { +val tableName = "orcTable" + +withTempDir { dir => + withTable(tableName) { +val sqlStatement = orcImp match { + case "native" => +s""" + |CREATE TABLE $tableName (zipcode STRING, uuid STRING, value DOUBLE) + |USING ORC + |OPTIONS ( + | path '${dir.toURI}', + | orc.dictionary.key.threshold '1.0', + | orc.column.encoding.direct 'uuid' --- End diff -- How about changing column name? I thought it's some kind of enum to represent encoding stuff. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22622: [SPARK-25635][SQL][BUILD] Support selective direc...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22622#discussion_r222535182 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala --- @@ -115,6 +116,71 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { } } + protected def testSelectiveDictionaryEncoding(isSelective: Boolean) { +val tableName = "orcTable" + +withTempDir { dir => + withTable(tableName) { +val sqlStatement = orcImp match { + case "native" => +s""" + |CREATE TABLE $tableName (zipcode STRING, uuid STRING, value DOUBLE) + |USING ORC + |OPTIONS ( + | path '${dir.toURI}', + | orc.dictionary.key.threshold '1.0', + | orc.column.encoding.direct 'uuid' + |) +""".stripMargin + case "hive" => +s""" + |CREATE TABLE $tableName (zipcode STRING, uuid STRING, value DOUBLE) + |STORED AS ORC + |LOCATION '${dir.toURI}' + |TBLPROPERTIES ( + | orc.dictionary.key.threshold '1.0', + | hive.exec.orc.dictionary.key.size.threshold '1.0', + | orc.column.encoding.direct 'uuid' + |) +""".stripMargin + case impl => +throw new UnsupportedOperationException(s"Unknown ORC implementation: $impl") +} + +sql(sqlStatement) +sql(s"INSERT INTO $tableName VALUES ('94086', 'random-uuid-string', 0.0)") + +val partFiles = dir.listFiles() + .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_")) +assert(partFiles.length === 1) + +val orcFilePath = new Path(partFiles.head.getAbsolutePath) +val readerOptions = OrcFile.readerOptions(new Configuration()) +val reader = OrcFile.createReader(orcFilePath, readerOptions) +var recordReader: RecordReaderImpl = null +try { + recordReader = reader.rows.asInstanceOf[RecordReaderImpl] + + // Check the kind + val stripe = recordReader.readStripeFooter(reader.getStripes.get(0)) + if (isSelective) { +assert(stripe.getColumns(1).getKind === DICTIONARY_V2) --- End diff -- @dongjoon-hyun, how about: ``` assert(stripe.getColumns(1).getKind === DICTIONARY_V2) assert(stripe.getColumns(3).getKind === DIRECT) if (isSelective) { assert(stripe.getColumns(2).getKind === DIRECT_V2) } else { assert(stripe.getColumns(2).getKind === DICTIONARY_V2) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22622: [SPARK-25635][SQL][BUILD] Support selective direc...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22622#discussion_r222535254 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala --- @@ -284,4 +350,8 @@ class OrcSourceSuite extends OrcSuite with SharedSQLContext { test("Check BloomFilter creation") { testBloomFilterCreation(Kind.BLOOM_FILTER_UTF8) // After ORC-101 } + + test("Enforce direct encoding column-wise selectively") { +testSelectiveDictionaryEncoding(true) --- End diff -- how about `testSelectiveDictionaryEncoding(isSelective = true)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22593: [Streaming][DOC] Fix typo & format in DataStreamWriter.s...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22593 ping @niofire --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning when retu...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22610 One clear thing looks adding some documentation .. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org