[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22379 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r225218562 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -777,7 +777,6 @@ case class SchemaOfJson( } object JsonExprUtils { - def evalSchemaExpr(exp: Expression): DataType = exp match { --- End diff -- makes sense, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r225195159 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.csv._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * Converts a CSV input string to a [[StructType]] with the specified schema. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with the given `csvStr` and `schema`.", + examples = """ +Examples: + > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE'); + {"a":1, "b":0.8} + > SELECT _FUNC_('26/08/2015', 'time Timestamp', map('timestampFormat', 'dd/MM/')) + {"time":2015-08-26 00:00:00.0} + """, + since = "3.0.0") +// scalastyle:on line.size.limit +case class CsvToStructs( +schema: StructType, +options: Map[String, String], +child: Expression, +timeZoneId: Option[String] = None) + extends UnaryExpression +with TimeZoneAwareExpression +with CodegenFallback +with ExpectsInputTypes +with NullIntolerant { + + override def nullable: Boolean = child.nullable + + // The CSV input data might be missing certain fields. We force the nullability + // of the user-provided schema to avoid data corruptions. + val nullableSchema: StructType = schema.asNullable + + // Used in `FunctionRegistry` + def this(child: Expression, schema: Expression, options: Map[String, String]) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = options, + child = child, + timeZoneId = None) + + def this(child: Expression, schema: Expression) = this(child, schema, Map.empty[String, String]) + + def this(child: Expression, schema: Expression, options: Expression) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = ExprUtils.convertToMapData(options), + child = child, + timeZoneId = None) + + // This converts parsed rows to the desired output by the given schema. + @transient + lazy val converter = (rows: Iterator[InternalRow]) => { +if (rows.hasNext) { + rows.next() --- End diff -- of course! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r225190659 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -777,7 +777,6 @@ case class SchemaOfJson( } object JsonExprUtils { - def evalSchemaExpr(exp: Expression): DataType = exp match { --- End diff -- I was following @MaxGekk's opinion (https://github.com/apache/spark/pull/22379/files/93d094f45b02afc0ab2f0650bbde1513823471a2#r224846183). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r225190224 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.csv._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * Converts a CSV input string to a [[StructType]] with the specified schema. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with the given `csvStr` and `schema`.", + examples = """ +Examples: + > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE'); + {"a":1, "b":0.8} + > SELECT _FUNC_('26/08/2015', 'time Timestamp', map('timestampFormat', 'dd/MM/')) + {"time":2015-08-26 00:00:00.0} + """, + since = "3.0.0") +// scalastyle:on line.size.limit +case class CsvToStructs( +schema: StructType, +options: Map[String, String], +child: Expression, +timeZoneId: Option[String] = None) + extends UnaryExpression +with TimeZoneAwareExpression +with CodegenFallback +with ExpectsInputTypes +with NullIntolerant { + + override def nullable: Boolean = child.nullable + + // The CSV input data might be missing certain fields. We force the nullability + // of the user-provided schema to avoid data corruptions. + val nullableSchema: StructType = schema.asNullable + + // Used in `FunctionRegistry` + def this(child: Expression, schema: Expression, options: Map[String, String]) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = options, + child = child, + timeZoneId = None) + + def this(child: Expression, schema: Expression) = this(child, schema, Map.empty[String, String]) + + def this(child: Expression, schema: Expression, options: Expression) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = ExprUtils.convertToMapData(options), + child = child, + timeZoneId = None) + + // This converts parsed rows to the desired output by the given schema. + @transient + lazy val converter = (rows: Iterator[InternalRow]) => { +if (rows.hasNext) { + rows.next() --- End diff -- Looks rows can't be more then one in this CSV's code path specifically. See below: ```scala val rawParser = new UnivocityParser(actualSchema, actualSchema, parsedOptions) new FailureSafeParser[String]( input => Seq(rawParser.parse(input)), mode, nullableSchema, parsedOptions.columnNameOfCorruptRecord, parsedOptions.multiLine) ``` Univocity parser: ```scala def parse(input: String): InternalRow = convert(tokenizer.parseLine(input)) ``` and in the `FailureSafeParser` ```scala class FailureSafeParser[IN]( rawParser: IN => Seq[InternalRow], mode: ParseMode, schema: StructType, columnNameOfCorruptRecord: String, isMultiLine: Boolean) { ``` ```scala def parse(input: IN): Iterator[InternalRow] = { try { if (skipParsing) { Iterator.single(InternalRow.empty) } else { rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) } } catch { case e: BadRecordException => mode match { case PermissiveMode =>
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r225186492 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.csv._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * Converts a CSV input string to a [[StructType]] with the specified schema. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with the given `csvStr` and `schema`.", + examples = """ +Examples: + > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE'); + {"a":1, "b":0.8} + > SELECT _FUNC_('26/08/2015', 'time Timestamp', map('timestampFormat', 'dd/MM/')) + {"time":2015-08-26 00:00:00.0} + """, + since = "3.0.0") +// scalastyle:on line.size.limit +case class CsvToStructs( +schema: StructType, +options: Map[String, String], +child: Expression, +timeZoneId: Option[String] = None) + extends UnaryExpression +with TimeZoneAwareExpression +with CodegenFallback +with ExpectsInputTypes +with NullIntolerant { + + override def nullable: Boolean = child.nullable + + // The CSV input data might be missing certain fields. We force the nullability + // of the user-provided schema to avoid data corruptions. + val nullableSchema: StructType = schema.asNullable + + // Used in `FunctionRegistry` + def this(child: Expression, schema: Expression, options: Map[String, String]) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = options, + child = child, + timeZoneId = None) + + def this(child: Expression, schema: Expression) = this(child, schema, Map.empty[String, String]) + + def this(child: Expression, schema: Expression, options: Expression) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = ExprUtils.convertToMapData(options), + child = child, + timeZoneId = None) + + // This converts parsed rows to the desired output by the given schema. + @transient + lazy val converter = (rows: Iterator[InternalRow]) => { +if (rows.hasNext) { + rows.next() --- End diff -- Oops, I missed. Let me check --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r225183977 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -777,7 +777,6 @@ case class SchemaOfJson( } object JsonExprUtils { - def evalSchemaExpr(exp: Expression): DataType = exp match { --- End diff -- do we still need it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r225033899 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala --- @@ -254,7 +256,7 @@ object TextInputCSVDataSource extends CSVDataSource { val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions) val sampled: Dataset[String] = CSVUtils.sample(csv, parsedOptions) val tokenRDD = sampled.rdd.mapPartitions { iter => - val filteredLines = CSVUtils.filterCommentAndEmpty(iter, parsedOptions) + val filteredLines = filterCommentAndEmpty(iter, parsedOptions) --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r225033808 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.csv._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * Converts a CSV input string to a [[StructType]] with the specified schema. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with the given `csvStr` and `schema`.", + examples = """ +Examples: + > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE'); + {"a":1, "b":0.8} + > SELECT _FUNC_('26/08/2015', 'time Timestamp', map('timestampFormat', 'dd/MM/')) + {"time":2015-08-26 00:00:00.0} + """, + since = "3.0.0") +// scalastyle:on line.size.limit +case class CsvToStructs( +schema: StructType, +options: Map[String, String], +child: Expression, +timeZoneId: Option[String] = None) + extends UnaryExpression +with TimeZoneAwareExpression +with CodegenFallback +with ExpectsInputTypes +with NullIntolerant { + + override def nullable: Boolean = child.nullable + + // The CSV input data might be missing certain fields. We force the nullability + // of the user-provided schema to avoid data corruptions. + val nullableSchema: StructType = schema.asNullable + + // Used in `FunctionRegistry` + def this(child: Expression, schema: Expression, options: Map[String, String]) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = options, + child = child, + timeZoneId = None) + + def this(child: Expression, schema: Expression) = this(child, schema, Map.empty[String, String]) + + def this(child: Expression, schema: Expression, options: Expression) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = ExprUtils.convertToMapData(options), + child = child, + timeZoneId = None) + + // This converts parsed rows to the desired output by the given schema. + @transient + lazy val converter = (rows: Iterator[InternalRow]) => { +if (rows.hasNext) { + rows.next() --- End diff -- what if `rows` have more than one row? shall we fail or shall we return null? Up to my understanding it should fail. The parser should only return one row for struct type. If it doesn't, there must be a bug. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r225030219 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -3854,6 +3854,38 @@ object functions { @scala.annotation.varargs def map_concat(cols: Column*): Column = withExpr { MapConcat(cols.map(_.expr)) } + /** + * Parses a column containing a CSV string into a `StructType` with the specified schema. + * Returns `null`, in the case of an unparseable string. + * + * @param e a string column containing CSV data. + * @param schema the schema to use when parsing the CSV string + * @param options options to control how the CSV is parsed. accepts the same options and the + *CSV data source. + * + * @group collection_funcs + * @since 3.0.0 + */ + def from_csv(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr { +CsvToStructs(schema, options, e.expr) + } + + /** + * (Java-specific) Parses a column containing a CSV string into a `StructType` + * with the specified schema. Returns `null`, in the case of an unparseable string. + * + * @param e a string column containing CSV data. + * @param schema the schema to use when parsing the CSV string + * @param options options to control how the CSV is parsed. accepts the same options and the + *CSV data source. + * + * @group collection_funcs + * @since 3.0.0 + */ + def from_csv(e: Column, schema: String, options: java.util.Map[String, String]): Column = { --- End diff -- Let me address this one tonight. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r224985980 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -3854,6 +3854,38 @@ object functions { @scala.annotation.varargs def map_concat(cols: Column*): Column = withExpr { MapConcat(cols.map(_.expr)) } + /** + * Parses a column containing a CSV string into a `StructType` with the specified schema. + * Returns `null`, in the case of an unparseable string. + * + * @param e a string column containing CSV data. + * @param schema the schema to use when parsing the CSV string + * @param options options to control how the CSV is parsed. accepts the same options and the + *CSV data source. + * + * @group collection_funcs + * @since 3.0.0 + */ + def from_csv(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr { +CsvToStructs(schema, options, e.expr) + } + + /** + * (Java-specific) Parses a column containing a CSV string into a `StructType` + * with the specified schema. Returns `null`, in the case of an unparseable string. + * + * @param e a string column containing CSV data. + * @param schema the schema to use when parsing the CSV string + * @param options options to control how the CSV is parsed. accepts the same options and the + *CSV data source. + * + * @group collection_funcs + * @since 3.0.0 + */ + def from_csv(e: Column, schema: String, options: java.util.Map[String, String]): Column = { --- End diff -- Eh, I wasn't following too. Is the problem related to its parameters for the function? we can just define `characterOrColumn` and use it. We can do something like: ``` if # is character schema <- lit(schema) else if # is column schema <- schema else stop("it should be column or characters") ``` like you did in Python side. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r224950964 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -3854,6 +3854,38 @@ object functions { @scala.annotation.varargs def map_concat(cols: Column*): Column = withExpr { MapConcat(cols.map(_.expr)) } + /** + * Parses a column containing a CSV string into a `StructType` with the specified schema. + * Returns `null`, in the case of an unparseable string. + * + * @param e a string column containing CSV data. + * @param schema the schema to use when parsing the CSV string + * @param options options to control how the CSV is parsed. accepts the same options and the + *CSV data source. + * + * @group collection_funcs + * @since 3.0.0 + */ + def from_csv(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr { +CsvToStructs(schema, options, e.expr) + } + + /** + * (Java-specific) Parses a column containing a CSV string into a `StructType` + * with the specified schema. Returns `null`, in the case of an unparseable string. + * + * @param e a string column containing CSV data. + * @param schema the schema to use when parsing the CSV string + * @param options options to control how the CSV is parsed. accepts the same options and the + *CSV data source. + * + * @group collection_funcs + * @since 3.0.0 + */ + def from_csv(e: Column, schema: String, options: java.util.Map[String, String]): Column = { --- End diff -- > but I wasn't following - what's the case for schema is a Column? The `schema_of_json`/`schema_of_csv` functions take a string in a column and return inferred schema of JSON/CSV as a column (in DDL format). To combine `from_json`/`from_csv` with `schema_of_json`/`schema_of_csv` need to pass a schema as the `Column` type. See in Scala : https://github.com/apache/spark/blob/1007cae20e8f566e7d7c25f0f81c9b84f352b6d5/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala#L396 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r224949068 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -3854,6 +3854,38 @@ object functions { @scala.annotation.varargs def map_concat(cols: Column*): Column = withExpr { MapConcat(cols.map(_.expr)) } + /** + * Parses a column containing a CSV string into a `StructType` with the specified schema. + * Returns `null`, in the case of an unparseable string. + * + * @param e a string column containing CSV data. + * @param schema the schema to use when parsing the CSV string + * @param options options to control how the CSV is parsed. accepts the same options and the + *CSV data source. + * + * @group collection_funcs + * @since 3.0.0 + */ + def from_csv(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr { +CsvToStructs(schema, options, e.expr) + } + + /** + * (Java-specific) Parses a column containing a CSV string into a `StructType` + * with the specified schema. Returns `null`, in the case of an unparseable string. + * + * @param e a string column containing CSV data. + * @param schema the schema to use when parsing the CSV string + * @param options options to control how the CSV is parsed. accepts the same options and the + *CSV data source. + * + * @group collection_funcs + * @since 3.0.0 + */ + def from_csv(e: Column, schema: String, options: java.util.Map[String, String]): Column = { --- End diff -- you can create a new type characterOrstructOrColumn? (letter casing is weird) but I wasn't following - what's the case for schema is a Column? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r224846183 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.util.ArrayBasedMapData +import org.apache.spark.sql.types.{MapType, StringType, StructType} + +object ExprUtils { + + def evalSchemaExpr(exp: Expression): StructType = exp match { --- End diff -- The difference between the two functions appeared when I modified `evalSchemaExpr` in `JsonExprUtils` to support `schema_of_json`. When we rebase and prepare `schema_of_csv` in the PR https://github.com/apache/spark/pull/22666, we will merge those two functions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r224844756 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVUtils.scala --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.csv + +object CSVUtils { + /** + * Filter ignorable rows for CSV iterator (lines empty and starting with `comment`). + * This is currently being used in CSV reading path and CSV schema inference. + */ + def filterCommentAndEmpty(iter: Iterator[String], options: CSVOptions): Iterator[String] = { +iter.filter { line => + line.trim.nonEmpty && !line.startsWith(options.comment.toString) +} + } + + /** + * Helper method that converts string representation of a character to actual character. + * It handles some Java escaped strings and throws exception if given string is longer than one + * character. + */ + @throws[IllegalArgumentException] + def toChar(str: String): Char = { --- End diff -- There shouldn't be duplicates there. I moved all functions used in `sql/catalyst` out of `sql/core`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r224843696 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -3854,6 +3854,38 @@ object functions { @scala.annotation.varargs def map_concat(cols: Column*): Column = withExpr { MapConcat(cols.map(_.expr)) } + /** + * Parses a column containing a CSV string into a `StructType` with the specified schema. + * Returns `null`, in the case of an unparseable string. + * + * @param e a string column containing CSV data. + * @param schema the schema to use when parsing the CSV string + * @param options options to control how the CSV is parsed. accepts the same options and the + *CSV data source. + * + * @group collection_funcs + * @since 3.0.0 + */ + def from_csv(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr { +CsvToStructs(schema, options, e.expr) + } + + /** + * (Java-specific) Parses a column containing a CSV string into a `StructType` + * with the specified schema. Returns `null`, in the case of an unparseable string. + * + * @param e a string column containing CSV data. + * @param schema the schema to use when parsing the CSV string + * @param options options to control how the CSV is parsed. accepts the same options and the + *CSV data source. + * + * @group collection_funcs + * @since 3.0.0 + */ + def from_csv(e: Column, schema: String, options: java.util.Map[String, String]): Column = { --- End diff -- What's stopped me to do that was I didn't know how to support the `Column` type in R. I even opened JIRA ticket for a similar issue related to `schema_of_json`: https://issues.apache.org/jira/browse/SPARK-25446 . The `from_json()` accepts the schema as `characterOrstructType` and how to extend to support the `Column` type as well not clear to me: https://github.com/apache/spark/blob/17781d75308c328b11cab3658ca4f358539414f2/R/pkg/R/functions.R#L2186 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r224649633 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala --- @@ -19,8 +19,8 @@ package org.apache.spark.sql.execution.datasources.csv import org.apache.spark.rdd.RDD import org.apache.spark.sql.Dataset +import org.apache.spark.sql.catalyst.csv.CSVOptions import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types._ object CSVUtils { --- End diff -- @MaxGekk, actually I was wondering if it's difficult to move this under catalyst package as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r224649495 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -3854,6 +3854,38 @@ object functions { @scala.annotation.varargs def map_concat(cols: Column*): Column = withExpr { MapConcat(cols.map(_.expr)) } + /** + * Parses a column containing a CSV string into a `StructType` with the specified schema. + * Returns `null`, in the case of an unparseable string. + * + * @param e a string column containing CSV data. + * @param schema the schema to use when parsing the CSV string + * @param options options to control how the CSV is parsed. accepts the same options and the + *CSV data source. + * + * @group collection_funcs + * @since 3.0.0 + */ + def from_csv(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr { --- End diff -- I would like to suggest to avoid adding overridden versions for now ... it has one Java specific version so should be usable in Java side. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r224649188 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -3854,6 +3854,38 @@ object functions { @scala.annotation.varargs def map_concat(cols: Column*): Column = withExpr { MapConcat(cols.map(_.expr)) } + /** + * Parses a column containing a CSV string into a `StructType` with the specified schema. + * Returns `null`, in the case of an unparseable string. + * + * @param e a string column containing CSV data. + * @param schema the schema to use when parsing the CSV string + * @param options options to control how the CSV is parsed. accepts the same options and the + *CSV data source. + * + * @group collection_funcs + * @since 3.0.0 + */ + def from_csv(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr { +CsvToStructs(schema, options, e.expr) + } + + /** + * (Java-specific) Parses a column containing a CSV string into a `StructType` + * with the specified schema. Returns `null`, in the case of an unparseable string. + * + * @param e a string column containing CSV data. + * @param schema the schema to use when parsing the CSV string + * @param options options to control how the CSV is parsed. accepts the same options and the + *CSV data source. + * + * @group collection_funcs + * @since 3.0.0 + */ + def from_csv(e: Column, schema: String, options: java.util.Map[String, String]): Column = { --- End diff -- @MaxGekk, can we replace `schema: String` to `schema: Column` for `schema_of_csv`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r224648638 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala --- @@ -40,16 +40,6 @@ object CSVUtils { } } - /** - * Filter ignorable rows for CSV iterator (lines empty and starting with `comment`). - * This is currently being used in CSV reading path and CSV schema inference. - */ - def filterCommentAndEmpty(iter: Iterator[String], options: CSVOptions): Iterator[String] = { --- End diff -- nope. It's under execution package. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r224648258 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala --- @@ -254,7 +256,7 @@ object TextInputCSVDataSource extends CSVDataSource { val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions) val sampled: Dataset[String] = CSVUtils.sample(csv, parsedOptions) val tokenRDD = sampled.rdd.mapPartitions { iter => - val filteredLines = CSVUtils.filterCommentAndEmpty(iter, parsedOptions) + val filteredLines = filterCommentAndEmpty(iter, parsedOptions) --- End diff -- not a big deal but let's just use `CSVUtils...` usage just for consistency in this file. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r224631648 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -3854,6 +3854,38 @@ object functions { @scala.annotation.varargs def map_concat(cols: Column*): Column = withExpr { MapConcat(cols.map(_.expr)) } + /** + * Parses a column containing a CSV string into a `StructType` with the specified schema. + * Returns `null`, in the case of an unparseable string. + * + * @param e a string column containing CSV data. + * @param schema the schema to use when parsing the CSV string + * @param options options to control how the CSV is parsed. accepts the same options and the + *CSV data source. + * + * @group collection_funcs + * @since 3.0.0 + */ + def from_csv(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr { --- End diff -- Do we need another API for JAVA? `java.util.Map[String, String]`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r224631037 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala --- @@ -40,16 +40,6 @@ object CSVUtils { } } - /** - * Filter ignorable rows for CSV iterator (lines empty and starting with `comment`). - * This is currently being used in CSV reading path and CSV schema inference. - */ - def filterCommentAndEmpty(iter: Iterator[String], options: CSVOptions): Iterator[String] = { --- End diff -- Is this a public API? cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r224629618 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVUtils.scala --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.csv + +object CSVUtils { + /** + * Filter ignorable rows for CSV iterator (lines empty and starting with `comment`). + * This is currently being used in CSV reading path and CSV schema inference. + */ + def filterCommentAndEmpty(iter: Iterator[String], options: CSVOptions): Iterator[String] = { +iter.filter { line => + line.trim.nonEmpty && !line.startsWith(options.comment.toString) +} + } + + /** + * Helper method that converts string representation of a character to actual character. + * It handles some Java escaped strings and throws exception if given string is longer than one + * character. + */ + @throws[IllegalArgumentException] + def toChar(str: String): Char = { --- End diff -- Do we need to keep both versions? Can we just let the functions in `org.apache.spark.sql.execution.datasources.csv` call the func here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r224630712 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.util.ArrayBasedMapData +import org.apache.spark.sql.types.{MapType, StringType, StructType} + +object ExprUtils { + + def evalSchemaExpr(exp: Expression): StructType = exp match { --- End diff -- Can we make `JsonExprUtils.evalSchemaExpr` and `ExprUtils.evalSchemaExpr` consistent? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r224629829 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.util.ArrayBasedMapData +import org.apache.spark.sql.types.{MapType, StringType, StructType} + +object ExprUtils { --- End diff -- The same comment in this object --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r224629713 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVUtils.scala --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.csv + +object CSVUtils { + /** + * Filter ignorable rows for CSV iterator (lines empty and starting with `comment`). + * This is currently being used in CSV reading path and CSV schema inference. + */ + def filterCommentAndEmpty(iter: Iterator[String], options: CSVOptions): Iterator[String] = { +iter.filter { line => + line.trim.nonEmpty && !line.startsWith(options.comment.toString) --- End diff -- The same comment here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r223152506 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.Calendar + +import scala.util.Random + +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.plans.PlanTestBase +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with PlanTestBase { + val badCsv = "\u\u\uA\u0001AAA" + + val gmtId = Option(DateTimeUtils.TimeZoneGMT.getID) + + test("from_csv") { +val csvData = "1" +val schema = StructType(StructField("a", IntegerType) :: Nil) +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(csvData), gmtId), + InternalRow(1) +) + } + + test("from_csv - invalid data") { +val csvData = "---" +val schema = StructType(StructField("a", DoubleType) :: Nil) +checkEvaluation( + CsvToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(csvData), gmtId), + InternalRow(null)) + +// Default mode is Permissive +checkEvaluation(CsvToStructs(schema, Map.empty, Literal(csvData), gmtId), InternalRow(null)) + } + + test("from_csv null input column") { +val schema = StructType(StructField("a", IntegerType) :: Nil) +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal.create(null, StringType), gmtId), + null +) + } + + test("from_csv bad UTF-8") { +val schema = StructType(StructField("a", IntegerType) :: Nil) +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(badCsv), gmtId), + InternalRow(null)) + } + + test("from_csv with timestamp") { +val schema = StructType(StructField("t", TimestampType) :: Nil) + +val csvData1 = "2016-01-01T00:00:00.123Z" +var c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT) +c.set(2016, 0, 1, 0, 0, 0) +c.set(Calendar.MILLISECOND, 123) +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(csvData1), gmtId), + InternalRow(c.getTimeInMillis * 1000L) +) +// The result doesn't change because the CSV string includes timezone string ("Z" here), +// which means the string represents the timestamp string in the timezone regardless of +// the timeZoneId parameter. +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(csvData1), Option("PST")), + InternalRow(c.getTimeInMillis * 1000L) +) + +val csvData2 = "2016-01-01T00:00:00" +for (tz <- Random.shuffle(DateTimeTestUtils.ALL_TIMEZONES).take(50)) { --- End diff -- several hundred milliseconds --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r223039662 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.Calendar + +import scala.util.Random + +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.plans.PlanTestBase +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with PlanTestBase { + val badCsv = "\u\u\uA\u0001AAA" + + val gmtId = Option(DateTimeUtils.TimeZoneGMT.getID) + + test("from_csv") { +val csvData = "1" +val schema = StructType(StructField("a", IntegerType) :: Nil) +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(csvData), gmtId), + InternalRow(1) +) + } + + test("from_csv - invalid data") { +val csvData = "---" +val schema = StructType(StructField("a", DoubleType) :: Nil) +checkEvaluation( + CsvToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(csvData), gmtId), + InternalRow(null)) + +// Default mode is Permissive +checkEvaluation(CsvToStructs(schema, Map.empty, Literal(csvData), gmtId), InternalRow(null)) + } + + test("from_csv null input column") { +val schema = StructType(StructField("a", IntegerType) :: Nil) +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal.create(null, StringType), gmtId), + null +) + } + + test("from_csv bad UTF-8") { +val schema = StructType(StructField("a", IntegerType) :: Nil) +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(badCsv), gmtId), + InternalRow(null)) + } + + test("from_csv with timestamp") { +val schema = StructType(StructField("t", TimestampType) :: Nil) + +val csvData1 = "2016-01-01T00:00:00.123Z" +var c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT) +c.set(2016, 0, 1, 0, 0, 0) +c.set(Calendar.MILLISECOND, 123) +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(csvData1), gmtId), + InternalRow(c.getTimeInMillis * 1000L) +) +// The result doesn't change because the CSV string includes timezone string ("Z" here), +// which means the string represents the timestamp string in the timezone regardless of +// the timeZoneId parameter. +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(csvData1), Option("PST")), + InternalRow(c.getTimeInMillis * 1000L) +) + +val csvData2 = "2016-01-01T00:00:00" +for (tz <- Random.shuffle(DateTimeTestUtils.ALL_TIMEZONES).take(50)) { --- End diff -- how long does this test take? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r223039004 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.csv._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * Converts a CSV input string to a [[StructType]] with the specified schema. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with the given `csvStr` and `schema`.", + examples = """ +Examples: + > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE'); + {"a":1, "b":0.8} + > SELECT _FUNC_('26/08/2015', 'time Timestamp', map('timestampFormat', 'dd/MM/')) + {"time":2015-08-26 00:00:00.0} + """, + since = "3.0.0") +// scalastyle:on line.size.limit +case class CsvToStructs( +schema: StructType, +options: Map[String, String], +child: Expression, +timeZoneId: Option[String] = None) + extends UnaryExpression +with TimeZoneAwareExpression +with CodegenFallback +with ExpectsInputTypes +with NullIntolerant { + + override def nullable: Boolean = child.nullable + + // The CSV input data might be missing certain fields. We force the nullability + // of the user-provided schema to avoid data corruptions. + val nullableSchema: StructType = schema.asNullable + + // Used in `FunctionRegistry` + def this(child: Expression, schema: Expression, options: Map[String, String]) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = options, + child = child, + timeZoneId = None) + + def this(child: Expression, schema: Expression) = this(child, schema, Map.empty[String, String]) + + def this(child: Expression, schema: Expression, options: Expression) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = ExprUtils.convertToMapData(options), + child = child, + timeZoneId = None) + + // This converts parsed rows to the desired output by the given schema. + @transient + lazy val converter = (rows: Iterator[InternalRow]) => +if (rows.hasNext) { + rows.next() +} else { + throw new IllegalArgumentException("Expected at least one row from CSV parser.") --- End diff -- so we are fine with more than one row? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r222954838 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.csv._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * Converts a CSV input string to a [[StructType]] with the specified schema. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with the given `csvStr` and `schema`.", + examples = """ +Examples: + > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE'); + {"a":1, "b":0.8} + > SELECT _FUNC_('26/08/2015', 'time Timestamp', map('timestampFormat', 'dd/MM/')) + {"time":2015-08-26 00:00:00.0} + """, + since = "3.0.0") +// scalastyle:on line.size.limit +case class CsvToStructs( +schema: StructType, +options: Map[String, String], +child: Expression, +timeZoneId: Option[String] = None) + extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes { + + override def nullable: Boolean = true + + // The CSV input data might be missing certain fields. We force the nullability + // of the user-provided schema to avoid data corruptions. + val nullableSchema = schema.asNullable + + // Used in `FunctionRegistry` + def this(child: Expression, schema: Expression, options: Map[String, String]) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = options, + child = child, + timeZoneId = None) + + def this(child: Expression, schema: Expression) = this(child, schema, Map.empty[String, String]) + + def this(child: Expression, schema: Expression, options: Expression) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = ExprUtils.convertToMapData(options), + child = child, + timeZoneId = None) + + // This converts parsed rows to the desired output by the given schema. + @transient + lazy val converter = (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null --- End diff -- Right, we expect one row from `FailureSafeParser`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r222894891 --- Diff: python/pyspark/sql/functions.py --- @@ -2664,6 +2664,28 @@ def sequence(start, stop, step=None): _to_java_column(start), _to_java_column(stop), _to_java_column(step))) +@ignore_unicode_prefix +@since(3.0) +def from_csv(col, schema, options={}): +""" +Parses a column containing a CSV string into a :class:`StructType` --- End diff -- `StructType` -> `Row`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r222895375 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.csv._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * Converts a CSV input string to a [[StructType]] with the specified schema. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with the given `csvStr` and `schema`.", + examples = """ +Examples: + > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE'); + {"a":1, "b":0.8} + > SELECT _FUNC_('26/08/2015', 'time Timestamp', map('timestampFormat', 'dd/MM/')) + {"time":2015-08-26 00:00:00.0} + """, + since = "3.0.0") +// scalastyle:on line.size.limit +case class CsvToStructs( +schema: StructType, +options: Map[String, String], +child: Expression, +timeZoneId: Option[String] = None) + extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes { + + override def nullable: Boolean = true + + // The CSV input data might be missing certain fields. We force the nullability + // of the user-provided schema to avoid data corruptions. + val nullableSchema = schema.asNullable + + // Used in `FunctionRegistry` + def this(child: Expression, schema: Expression, options: Map[String, String]) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = options, + child = child, + timeZoneId = None) + + def this(child: Expression, schema: Expression) = this(child, schema, Map.empty[String, String]) + + def this(child: Expression, schema: Expression, options: Expression) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = ExprUtils.convertToMapData(options), + child = child, + timeZoneId = None) + + // This converts parsed rows to the desired output by the given schema. + @transient + lazy val converter = (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null --- End diff -- the input string is expected to be a single CSV record right? Shall we fail if it's not? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r222895573 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.csv._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * Converts a CSV input string to a [[StructType]] with the specified schema. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with the given `csvStr` and `schema`.", + examples = """ +Examples: + > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE'); + {"a":1, "b":0.8} + > SELECT _FUNC_('26/08/2015', 'time Timestamp', map('timestampFormat', 'dd/MM/')) + {"time":2015-08-26 00:00:00.0} + """, + since = "3.0.0") +// scalastyle:on line.size.limit +case class CsvToStructs( +schema: StructType, +options: Map[String, String], +child: Expression, +timeZoneId: Option[String] = None) + extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes { + + override def nullable: Boolean = true --- End diff -- This expression only returns null if input is null. Shall we define the nullable as `child.nullable`? And this expression should extend `NullIntolerant` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r222874417 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala --- @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.Calendar + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.plans.PlanTestBase +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with PlanTestBase { + val badCsv = "\u\u\uA\u0001AAA" + + val gmtId = Option(DateTimeUtils.TimeZoneGMT.getID) + + test("from_csv") { +val csvData = "1" +val schema = StructType(StructField("a", IntegerType) :: Nil) +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(csvData), gmtId), + InternalRow(1) +) + } + + test("from_csv - invalid data") { +val csvData = "---" +val schema = StructType(StructField("a", DoubleType) :: Nil) +checkEvaluation( + CsvToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(csvData), gmtId), + InternalRow(null)) + +// Default mode is Permissive +checkEvaluation(CsvToStructs(schema, Map.empty, Literal(csvData), gmtId), InternalRow(null)) + } + + test("from_csv null input column") { +val schema = StructType(StructField("a", IntegerType) :: Nil) +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal.create(null, StringType), gmtId), + null +) + } + + test("from_csv bad UTF-8") { +val schema = StructType(StructField("a", IntegerType) :: Nil) +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(badCsv), gmtId), + InternalRow(null)) + } + + test("from_csv with timestamp") { +val schema = StructType(StructField("t", TimestampType) :: Nil) + +val csvData1 = "2016-01-01T00:00:00.123Z" +var c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT) +c.set(2016, 0, 1, 0, 0, 0) +c.set(Calendar.MILLISECOND, 123) +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(csvData1), gmtId), + InternalRow(c.getTimeInMillis * 1000L) +) +// The result doesn't change because the CSV string includes timezone string ("Z" here), +// which means the string represents the timestamp string in the timezone regardless of +// the timeZoneId parameter. +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(csvData1), Option("PST")), + InternalRow(c.getTimeInMillis * 1000L) +) + +val csvData2 = "2016-01-01T00:00:00" +for (tz <- DateTimeTestUtils.ALL_TIMEZONES) { --- End diff -- It sounds good to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r222807843 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.csv._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * Converts a CSV input string to a [[StructType]] with the specified schema. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with the given `csvStr` and `schema`.", + examples = """ +Examples: + > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE'); + {"a":1, "b":0.8} + """, + since = "3.0.0") +// scalastyle:on line.size.limit +case class CsvToStructs( +schema: StructType, +options: Map[String, String], +child: Expression, +timeZoneId: Option[String] = None) + extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes { + + override def nullable: Boolean = true + + // The CSV input data might be missing certain fields. We force the nullability + // of the user-provided schema to avoid data corruptions. + val nullableSchema = schema.asNullable + + // Used in `FunctionRegistry` + def this(child: Expression, schema: Expression, options: Map[String, String]) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = options, + child = child, + timeZoneId = None) + + def this(child: Expression, schema: Expression) = this(child, schema, Map.empty[String, String]) + + def this(child: Expression, schema: Expression, options: Expression) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = ExprUtils.convertToMapData(options), + child = child, + timeZoneId = None) + + // This converts parsed rows to the desired output by the given schema. + @transient + lazy val converter = (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null + + @transient lazy val parser = { +val parsedOptions = new CSVOptions(options, true, timeZoneId.get) +val mode = parsedOptions.parseMode +if (mode != PermissiveMode && mode != FailFastMode) { + throw new AnalysisException(s"from_csv() doesn't support the ${mode.name} mode. " + +s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}.") +} +val actualSchema = + StructType(nullableSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) --- End diff -- added --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r222797840 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala --- @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.Calendar + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.plans.PlanTestBase +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with PlanTestBase { + val badCsv = "\u\u\uA\u0001AAA" + + val gmtId = Option(DateTimeUtils.TimeZoneGMT.getID) + + test("from_csv") { +val csvData = "1" +val schema = StructType(StructField("a", IntegerType) :: Nil) +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(csvData), gmtId), + InternalRow(1) +) + } + + test("from_csv - invalid data") { +val csvData = "---" +val schema = StructType(StructField("a", DoubleType) :: Nil) +checkEvaluation( + CsvToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(csvData), gmtId), + InternalRow(null)) + +// Default mode is Permissive +checkEvaluation(CsvToStructs(schema, Map.empty, Literal(csvData), gmtId), InternalRow(null)) + } + + test("from_csv null input column") { +val schema = StructType(StructField("a", IntegerType) :: Nil) +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal.create(null, StringType), gmtId), + null +) + } + + test("from_csv bad UTF-8") { +val schema = StructType(StructField("a", IntegerType) :: Nil) +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(badCsv), gmtId), + InternalRow(null)) + } + + test("from_csv with timestamp") { +val schema = StructType(StructField("t", TimestampType) :: Nil) + +val csvData1 = "2016-01-01T00:00:00.123Z" +var c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT) +c.set(2016, 0, 1, 0, 0, 0) +c.set(Calendar.MILLISECOND, 123) +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(csvData1), gmtId), + InternalRow(c.getTimeInMillis * 1000L) +) +// The result doesn't change because the CSV string includes timezone string ("Z" here), +// which means the string represents the timestamp string in the timezone regardless of +// the timeZoneId parameter. +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(csvData1), Option("PST")), + InternalRow(c.getTimeInMillis * 1000L) +) + +val csvData2 = "2016-01-01T00:00:00" +for (tz <- DateTimeTestUtils.ALL_TIMEZONES) { --- End diff -- Does it make sense to apply the same approach like there: https://github.com/apache/spark/pull/22631 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r222796379 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala --- @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.Calendar + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.plans.PlanTestBase +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with PlanTestBase { + val badCsv = "\u\u\uA\u0001AAA" + + val gmtId = Option(DateTimeUtils.TimeZoneGMT.getID) + + test("from_csv") { +val csvData = "1" +val schema = StructType(StructField("a", IntegerType) :: Nil) +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(csvData), gmtId), + InternalRow(1) +) + } + + test("from_csv - invalid data") { +val csvData = "---" +val schema = StructType(StructField("a", DoubleType) :: Nil) +checkEvaluation( + CsvToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(csvData), gmtId), + InternalRow(null)) + +// Default mode is Permissive +checkEvaluation(CsvToStructs(schema, Map.empty, Literal(csvData), gmtId), InternalRow(null)) + } + + test("from_csv null input column") { +val schema = StructType(StructField("a", IntegerType) :: Nil) +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal.create(null, StringType), gmtId), + null +) + } + + test("from_csv bad UTF-8") { +val schema = StructType(StructField("a", IntegerType) :: Nil) +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(badCsv), gmtId), + InternalRow(null)) + } + + test("from_csv with timestamp") { +val schema = StructType(StructField("t", TimestampType) :: Nil) + +val csvData1 = "2016-01-01T00:00:00.123Z" +var c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT) +c.set(2016, 0, 1, 0, 0, 0) +c.set(Calendar.MILLISECOND, 123) +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(csvData1), gmtId), + InternalRow(c.getTimeInMillis * 1000L) +) +// The result doesn't change because the CSV string includes timezone string ("Z" here), +// which means the string represents the timestamp string in the timezone regardless of +// the timeZoneId parameter. +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(csvData1), Option("PST")), + InternalRow(c.getTimeInMillis * 1000L) +) + +val csvData2 = "2016-01-01T00:00:00" +for (tz <- DateTimeTestUtils.ALL_TIMEZONES) { --- End diff -- @gatorsmile It takes ~ 9.5 seconds on my laptop. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r222777373 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.csv._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * Converts a CSV input string to a [[StructType]] with the specified schema. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with the given `csvStr` and `schema`.", + examples = """ +Examples: + > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE'); + {"a":1, "b":0.8} + """, + since = "3.0.0") +// scalastyle:on line.size.limit +case class CsvToStructs( +schema: StructType, +options: Map[String, String], +child: Expression, +timeZoneId: Option[String] = None) + extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes { + + override def nullable: Boolean = true + + // The CSV input data might be missing certain fields. We force the nullability + // of the user-provided schema to avoid data corruptions. + val nullableSchema = schema.asNullable + + // Used in `FunctionRegistry` + def this(child: Expression, schema: Expression, options: Map[String, String]) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = options, + child = child, + timeZoneId = None) + + def this(child: Expression, schema: Expression) = this(child, schema, Map.empty[String, String]) + + def this(child: Expression, schema: Expression, options: Expression) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = ExprUtils.convertToMapData(options), + child = child, + timeZoneId = None) + + // This converts parsed rows to the desired output by the given schema. + @transient + lazy val converter = (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null + + @transient lazy val parser = { +val parsedOptions = new CSVOptions(options, true, timeZoneId.get) +val mode = parsedOptions.parseMode +if (mode != PermissiveMode && mode != FailFastMode) { + throw new AnalysisException(s"from_csv() doesn't support the ${mode.name} mode. " + +s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}.") +} +val actualSchema = + StructType(nullableSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) --- End diff -- Could we have a test case to ensure it works? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r222776820 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.csv._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * Converts a CSV input string to a [[StructType]] with the specified schema. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with the given `csvStr` and `schema`.", + examples = """ +Examples: + > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE'); + {"a":1, "b":0.8} + """, + since = "3.0.0") +// scalastyle:on line.size.limit +case class CsvToStructs( +schema: StructType, +options: Map[String, String], +child: Expression, +timeZoneId: Option[String] = None) + extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes { + + override def nullable: Boolean = true + + // The CSV input data might be missing certain fields. We force the nullability + // of the user-provided schema to avoid data corruptions. + val nullableSchema = schema.asNullable + + // Used in `FunctionRegistry` + def this(child: Expression, schema: Expression, options: Map[String, String]) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = options, + child = child, + timeZoneId = None) + + def this(child: Expression, schema: Expression) = this(child, schema, Map.empty[String, String]) + + def this(child: Expression, schema: Expression, options: Expression) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = ExprUtils.convertToMapData(options), + child = child, + timeZoneId = None) + + // This converts parsed rows to the desired output by the given schema. + @transient + lazy val converter = (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null --- End diff -- why not inlining this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r222773963 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.csv._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * Converts a CSV input string to a [[StructType]] with the specified schema. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with the given `csvStr` and `schema`.", + examples = """ +Examples: + > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE'); + {"a":1, "b":0.8} + """, + since = "3.0.0") +// scalastyle:on line.size.limit +case class CsvToStructs( +schema: StructType, +options: Map[String, String], +child: Expression, +timeZoneId: Option[String] = None) + extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes { + + override def nullable: Boolean = true + + // The CSV input data might be missing certain fields. We force the nullability + // of the user-provided schema to avoid data corruptions. + val nullableSchema = schema.asNullable + + // Used in `FunctionRegistry` + def this(child: Expression, schema: Expression, options: Map[String, String]) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = options, + child = child, + timeZoneId = None) + + def this(child: Expression, schema: Expression) = this(child, schema, Map.empty[String, String]) + + def this(child: Expression, schema: Expression, options: Expression) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = ExprUtils.convertToMapData(options), + child = child, + timeZoneId = None) + + // This converts parsed rows to the desired output by the given schema. + @transient + lazy val converter = (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null + + @transient lazy val parser = { +val parsedOptions = new CSVOptions(options, true, timeZoneId.get) --- End diff -- named boolean expression. `columnPruning = true` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r222774540 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.csv._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * Converts a CSV input string to a [[StructType]] with the specified schema. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with the given `csvStr` and `schema`.", + examples = """ +Examples: + > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE'); + {"a":1, "b":0.8} + """, + since = "3.0.0") +// scalastyle:on line.size.limit +case class CsvToStructs( +schema: StructType, +options: Map[String, String], +child: Expression, +timeZoneId: Option[String] = None) + extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes { + + override def nullable: Boolean = true + + // The CSV input data might be missing certain fields. We force the nullability + // of the user-provided schema to avoid data corruptions. + val nullableSchema = schema.asNullable + + // Used in `FunctionRegistry` + def this(child: Expression, schema: Expression, options: Map[String, String]) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = options, + child = child, + timeZoneId = None) + + def this(child: Expression, schema: Expression) = this(child, schema, Map.empty[String, String]) + + def this(child: Expression, schema: Expression, options: Expression) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = ExprUtils.convertToMapData(options), + child = child, + timeZoneId = None) + + // This converts parsed rows to the desired output by the given schema. + @transient + lazy val converter = (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null + + @transient lazy val parser = { +val parsedOptions = new CSVOptions(options, true, timeZoneId.get) +val mode = parsedOptions.parseMode +if (mode != PermissiveMode && mode != FailFastMode) { + throw new AnalysisException(s"from_csv() doesn't support the ${mode.name} mode. " + +s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}.") +} +val actualSchema = + StructType(nullableSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) +val rawParser = new UnivocityParser(actualSchema, actualSchema, parsedOptions) +new FailureSafeParser[String]( + input => Seq(rawParser.parse(input)), + mode, + nullableSchema, + parsedOptions.columnNameOfCorruptRecord, + parsedOptions.multiLine) + } + + override def dataType: DataType = nullableSchema + + override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = +copy(timeZoneId = Option(timeZoneId)) --- End diff -- ``` override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = { copy(timeZoneId = Option(timeZoneId)) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r222774199 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.csv._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * Converts a CSV input string to a [[StructType]] with the specified schema. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with the given `csvStr` and `schema`.", + examples = """ +Examples: + > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE'); + {"a":1, "b":0.8} + """, + since = "3.0.0") +// scalastyle:on line.size.limit +case class CsvToStructs( +schema: StructType, +options: Map[String, String], +child: Expression, +timeZoneId: Option[String] = None) + extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes { + + override def nullable: Boolean = true + + // The CSV input data might be missing certain fields. We force the nullability + // of the user-provided schema to avoid data corruptions. + val nullableSchema = schema.asNullable + + // Used in `FunctionRegistry` + def this(child: Expression, schema: Expression, options: Map[String, String]) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = options, + child = child, + timeZoneId = None) + + def this(child: Expression, schema: Expression) = this(child, schema, Map.empty[String, String]) + + def this(child: Expression, schema: Expression, options: Expression) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = ExprUtils.convertToMapData(options), + child = child, + timeZoneId = None) + + // This converts parsed rows to the desired output by the given schema. + @transient + lazy val converter = (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null + + @transient lazy val parser = { +val parsedOptions = new CSVOptions(options, true, timeZoneId.get) +val mode = parsedOptions.parseMode +if (mode != PermissiveMode && mode != FailFastMode) { + throw new AnalysisException(s"from_csv() doesn't support the ${mode.name} mode. " + --- End diff -- test case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r222772770 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala --- @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.Calendar + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.plans.PlanTestBase +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with PlanTestBase { + val badCsv = "\u\u\uA\u0001AAA" + + val gmtId = Option(DateTimeUtils.TimeZoneGMT.getID) + + test("from_csv") { +val csvData = "1" +val schema = StructType(StructField("a", IntegerType) :: Nil) +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(csvData), gmtId), + InternalRow(1) +) + } + + test("from_csv - invalid data") { +val csvData = "---" +val schema = StructType(StructField("a", DoubleType) :: Nil) +checkEvaluation( + CsvToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(csvData), gmtId), + InternalRow(null)) + +// Default mode is Permissive +checkEvaluation(CsvToStructs(schema, Map.empty, Literal(csvData), gmtId), InternalRow(null)) + } + + test("from_csv null input column") { +val schema = StructType(StructField("a", IntegerType) :: Nil) +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal.create(null, StringType), gmtId), + null +) + } + + test("from_csv bad UTF-8") { +val schema = StructType(StructField("a", IntegerType) :: Nil) +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(badCsv), gmtId), + InternalRow(null)) + } + + test("from_csv with timestamp") { +val schema = StructType(StructField("t", TimestampType) :: Nil) + +val csvData1 = "2016-01-01T00:00:00.123Z" +var c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT) +c.set(2016, 0, 1, 0, 0, 0) +c.set(Calendar.MILLISECOND, 123) +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(csvData1), gmtId), + InternalRow(c.getTimeInMillis * 1000L) +) +// The result doesn't change because the CSV string includes timezone string ("Z" here), +// which means the string represents the timestamp string in the timezone regardless of +// the timeZoneId parameter. +checkEvaluation( + CsvToStructs(schema, Map.empty, Literal(csvData1), Option("PST")), + InternalRow(c.getTimeInMillis * 1000L) +) + +val csvData2 = "2016-01-01T00:00:00" +for (tz <- DateTimeTestUtils.ALL_TIMEZONES) { --- End diff -- how long does this test case take? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r222771917 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.csv._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * Converts a CSV input string to a [[StructType]] with the specified schema. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with the given `csvStr` and `schema`.", + examples = """ +Examples: + > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE'); + {"a":1, "b":0.8} --- End diff -- Also add an example with options. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r221449056 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala --- @@ -520,7 +520,10 @@ object FunctionRegistry { castAlias("date", DateType), castAlias("timestamp", TimestampType), castAlias("binary", BinaryType), -castAlias("string", StringType) +castAlias("string", StringType), + +// csv +expression[CsvToStructs]("from_csv") --- End diff -- This sounds a reasonable change. cc @rxin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r221424336 --- Diff: R/pkg/R/functions.R --- @@ -2203,6 +2209,23 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType") column(jc) }) +#' @details +#' \code{from_csv}: Parses a column containing a CSV string into a Column of \code{structType} +#' with the specified \code{schema}. +#' If the string is unparseable, the Column will contain the value NA. +#' +#' @rdname column_collection_functions +#' @aliases from_csv from_csv,Column,character-method +#' @note from_csv since 2.5.0 --- End diff -- added --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r221415711 --- Diff: R/pkg/R/functions.R --- @@ -2203,6 +2209,23 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType") column(jc) }) +#' @details +#' \code{from_csv}: Parses a column containing a CSV string into a Column of \code{structType} +#' with the specified \code{schema}. +#' If the string is unparseable, the Column will contain the value NA. +#' +#' @rdname column_collection_functions +#' @aliases from_csv from_csv,Column,character-method +#' @note from_csv since 2.5.0 --- End diff -- consider adding example as in https://github.com/apache/spark/pull/22379/files#diff-d97f9adc2dcac0703568c799ff106987R2180? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r221047590 --- Diff: R/pkg/R/functions.R --- @@ -2204,6 +2210,26 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType") }) #' @details +#' \code{from_csv}: Parses a column containing a CSV string into a Column of \code{structType} +#' with the specified \code{schema}. +#' If the string is unparseable, the Column will contain the value NA. +#' +#' @rdname column_collection_functions +#' @aliases from_csv from_csv,Column,character-method +#' @note from_csv since 2.5.0 +setMethod("from_csv", signature(x = "Column", schema = "character"), + function(x, schema, ...) { +options <- varargsToStrEnv(...) +jc <- callJStatic("org.apache.spark.sql.functions", + "from_csv", + x@jc, schema, options) +column(jc) + }) + +#' @details +#' \code{from_utc_timestamp}: Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a +#' time in UTC, and renders that time as a timestamp in the given time zone. For example, 'GMT+1' +#' would yield '2017-07-14 03:40:00.0'. --- End diff -- Please remove the above three lines which were removed via https://github.com/apache/spark/commit/ff876137faba1802b66ecd483ba15f6ccd83ffc5#diff-d97f9adc2dcac0703568c799ff106987L2207 by @cloud-fan . --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r220409483 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala --- @@ -520,7 +520,10 @@ object FunctionRegistry { castAlias("date", DateType), castAlias("timestamp", TimestampType), castAlias("binary", BinaryType), -castAlias("string", StringType) +castAlias("string", StringType), + +// csv +expression[CsvToStructs]("from_csv") --- End diff -- Yea, it looks so. That's actually the concern in https://github.com/apache/spark/pull/22379#discussion_r216443482 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r220408893 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala --- @@ -520,7 +520,10 @@ object FunctionRegistry { castAlias("date", DateType), castAlias("timestamp", TimestampType), castAlias("binary", BinaryType), -castAlias("string", StringType) +castAlias("string", StringType), + +// csv +expression[CsvToStructs]("from_csv") --- End diff -- So the only reason to move CSV functionality to catalyst is to register it as built-in functuion? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r220346397 --- Diff: R/pkg/NAMESPACE --- @@ -275,6 +275,7 @@ exportMethods("%<=>%", "format_number", "format_string", "from_json", + "from_csv", --- End diff -- sorted --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r220345834 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types._ + +class CsvFunctionsSuite extends QueryTest with SharedSQLContext { + import testImplicits._ + + val noOptions = Map[String, String]() + + test("from_csv") { +val df = Seq("1").toDS() +val schema = new StructType().add("a", IntegerType) + +checkAnswer( + df.select(from_csv($"value", schema, noOptions)), + Row(Row(1)) :: Nil) + } + + test("from_csv with option") { +val df = Seq("26/08/2015 18:00").toDS() +val schema = new StructType().add("time", TimestampType) +val options = Map("timestampFormat" -> "dd/MM/ HH:mm") + +checkAnswer( + df.select(from_csv($"value", schema, options)), + Row(Row(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0" + } + + test("from_csv missing columns") { --- End diff -- I moved most of the tests for `csvFunctionsSuite` to `csvExpressionsSuite`. I leaved 2 tests for testing both signatures of the `from_csv` method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r220345504 --- Diff: sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql --- @@ -0,0 +1,12 @@ +-- from_csv +describe function from_csv; +describe function extended from_csv; +select from_csv('1', 'a INT'); --- End diff -- I removed it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r220345402 --- Diff: sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql --- @@ -0,0 +1,12 @@ +-- from_csv +describe function from_csv; +describe function extended from_csv; --- End diff -- I moved most of the tests for `csvFunctionsSuite` to `csvExpressionsSuite`. I leaved 2 tests for testing both signatures of the `from_csv` method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r219947088 --- Diff: sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql --- @@ -0,0 +1,12 @@ +-- from_csv +describe function from_csv; +describe function extended from_csv; --- End diff -- Do you mean both `describe` or only the last one? I would actually leave it to keep consistency with `from_json` tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r219945469 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types._ + +class CsvFunctionsSuite extends QueryTest with SharedSQLContext { + import testImplicits._ + + val noOptions = Map[String, String]() + + test("from_csv") { +val df = Seq("1").toDS() +val schema = new StructType().add("a", IntegerType) + +checkAnswer( + df.select(from_csv($"value", schema, noOptions)), + Row(Row(1)) :: Nil) + } + + test("from_csv with option") { +val df = Seq("26/08/2015 18:00").toDS() +val schema = new StructType().add("time", TimestampType) +val options = Map("timestampFormat" -> "dd/MM/ HH:mm") + +checkAnswer( + df.select(from_csv($"value", schema, options)), + Row(Row(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0" + } + + test("from_csv missing columns") { --- End diff -- I just looked at tests for `from_json` and ported relevant tests. I thought it would be convenient having the same structure of tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r219815230 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types._ + +class CsvFunctionsSuite extends QueryTest with SharedSQLContext { + import testImplicits._ + + val noOptions = Map[String, String]() + + test("from_csv") { +val df = Seq("1").toDS() +val schema = new StructType().add("a", IntegerType) + +checkAnswer( + df.select(from_csv($"value", schema, noOptions)), + Row(Row(1)) :: Nil) + } + + test("from_csv with option") { +val df = Seq("26/08/2015 18:00").toDS() +val schema = new StructType().add("time", TimestampType) +val options = Map("timestampFormat" -> "dd/MM/ HH:mm") + +checkAnswer( + df.select(from_csv($"value", schema, options)), + Row(Row(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0" + } + + test("from_csv missing columns") { --- End diff -- @MaxGekk, I think we just need to test few set of end-to-end tests and put other tests into `CsvExpressionsSuite.scala`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r219814691 --- Diff: sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql --- @@ -0,0 +1,12 @@ +-- from_csv +describe function from_csv; +describe function extended from_csv; --- End diff -- Shall we not test this? It requires to fix the tests accordingly everytime we fix the doc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r219814329 --- Diff: sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql --- @@ -0,0 +1,12 @@ +-- from_csv +describe function from_csv; +describe function extended from_csv; +select from_csv('1', 'a INT'); --- End diff -- I think we don't need this test case since the below one tests it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r219813519 --- Diff: python/pyspark/sql/functions.py --- @@ -2637,6 +2637,29 @@ def sequence(start, stop, step=None): _to_java_column(start), _to_java_column(stop), _to_java_column(step))) +@ignore_unicode_prefix +@since(2.5) +def from_csv(col, schema, options={}): +""" +Parses a column containing a CSV string into a :class:`StructType` +with the specified schema. Returns `null`, in the case of an unparseable string. + +:param col: string column in CSV format +:param schema: a string with schema in DDL format to use when parsing the CSV column. +:param options: options to control parsing. accepts the same options as the CSV datasource + +>>> from pyspark.sql.types import * --- End diff -- Looks we don't need this import. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r217901558 --- Diff: R/pkg/NAMESPACE --- @@ -275,6 +275,7 @@ exportMethods("%<=>%", "format_number", "format_string", "from_json", + "from_csv", --- End diff -- pleas sort this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r217901588 --- Diff: R/pkg/R/functions.R --- @@ -2202,6 +2208,24 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType") column(jc) }) +#' @details +#' \code{from_csv}: Parses a column containing a CSV string into a Column of \code{structType} +#' with the specified \code{schema}. +#' If the string is unparseable, the Column will contain the value NA. +#' +#' @rdname column_collection_functions +#' @aliases from_csv from_csv,Column,character-method +#' --- End diff -- newline with `#'` is significant in ROxygen, please remove this line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r216875875 --- Diff: R/pkg/R/functions.R --- @@ -3720,3 +3720,22 @@ setMethod("current_timestamp", jc <- callJStatic("org.apache.spark.sql.functions", "current_timestamp") column(jc) }) + +#' @details +#' \code{from_csv}: Parses a column containing a CSV string into a Column of \code{structType} +#' with the specified \code{schema}. +#' If the string is unparseable, the Column will contain the value NA. +#' +#' @rdname column_collection_functions +#' @param schema a DDL-formatted string +#' @aliases from_csv from_csv,Column,character-method +#' +#' @note from_csv since 3.0.0 +setMethod("from_csv", signature(x = "Column", schema = "character"), + function(x, schema, ...) { --- End diff -- here https://github.com/apache/spark/blob/d2bfd9430f05d006accdecb6a62ed659fbd6a2f8/R/pkg/R/functions.R#L199 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r216875804 --- Diff: R/pkg/R/functions.R --- @@ -3720,3 +3720,22 @@ setMethod("current_timestamp", jc <- callJStatic("org.apache.spark.sql.functions", "current_timestamp") column(jc) }) + +#' @details +#' \code{from_csv}: Parses a column containing a CSV string into a Column of \code{structType} +#' with the specified \code{schema}. +#' If the string is unparseable, the Column will contain the value NA. +#' +#' @rdname column_collection_functions +#' @param schema a DDL-formatted string +#' @aliases from_csv from_csv,Column,character-method +#' +#' @note from_csv since 3.0.0 +setMethod("from_csv", signature(x = "Column", schema = "character"), + function(x, schema, ...) { --- End diff -- no no, this will break - I am referring to find the original doc `@rdname column_collection_functions` that has `...` already documented, and then add this in --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r216734651 --- Diff: R/pkg/R/functions.R --- @@ -3720,3 +3720,22 @@ setMethod("current_timestamp", jc <- callJStatic("org.apache.spark.sql.functions", "current_timestamp") column(jc) }) + +#' @details +#' \code{from_csv}: Parses a column containing a CSV string into a Column of \code{structType} +#' with the specified \code{schema}. +#' If the string is unparseable, the Column will contain the value NA. +#' +#' @rdname column_collection_functions +#' @param schema a DDL-formatted string +#' @aliases from_csv from_csv,Column,character-method +#' +#' @note from_csv since 3.0.0 +setMethod("from_csv", signature(x = "Column", schema = "character"), + function(x, schema, ...) { --- End diff -- @felixcheung I added a doc but when I run `run-tests.sh`, it outputs the warning: ``` * checking Rd \usage sections ... WARNING Duplicated \argument entries in documentation object 'column_collection_functions': âschemaâ Functions with \usage entries need to have the appropriate \alias entries, and all their arguments documented. The \usage entries must correspond to syntactically valid R code. See chapter âWriting R documentation filesâ in the âWriting R Extensionsâ manual. ``` Is it ok? If it isn't, any ideas what can cause it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r216538924 --- Diff: R/pkg/R/functions.R --- @@ -3720,3 +3720,22 @@ setMethod("current_timestamp", jc <- callJStatic("org.apache.spark.sql.functions", "current_timestamp") column(jc) }) + +#' @details +#' \code{from_csv}: Parses a column containing a CSV string into a Column of \code{structType} +#' with the specified \code{schema}. +#' If the string is unparseable, the Column will contain the value NA. +#' +#' @rdname column_collection_functions +#' @param schema a DDL-formatted string +#' @aliases from_csv from_csv,Column,character-method +#' +#' @note from_csv since 3.0.0 +setMethod("from_csv", signature(x = "Column", schema = "character"), + function(x, schema, ...) { --- End diff -- can you add to the doc for `...` (in column_collection_functions) to indicate the use options for this function? if there is anything new? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r216510114 --- Diff: sql/catalyst/pom.xml --- @@ -103,6 +103,12 @@ commons-codec commons-codec + + com.univocity + univocity-parsers + 2.7.3 + jar + --- End diff -- Ideally we should just make CSV one as a separate external module and this should be the right way given the discussion. The current change wouldn't necessarily is blocked but I can see the point of moving the dependency makes further refactoring potentially harder as pointed out. Looks many people agreed upon separating them. The concern here is, it sounds we are stepping back from the ideal approach. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r216509108 --- Diff: R/pkg/R/functions.R --- @@ -3720,3 +3720,22 @@ setMethod("current_timestamp", jc <- callJStatic("org.apache.spark.sql.functions", "current_timestamp") column(jc) }) + +#' @details +#' \code{from_csv}: Parses a column containing a CSV string into a Column of \code{structType} +#' with the specified \code{schema}. +#' If the string is unparseable, the Column will contain the value NA. +#' +#' @rdname column_collection_functions +#' @param schema a DDL-formatted string +#' @aliases from_csv from_csv,Column,character-method +#' +#' @note from_csv since 3.0.0 +setMethod("from_csv", signature(x = "Column", schema = "character"), + function(x, schema, ...) { +options <- varargsToStrEnv(...) +jc <- callJStatic("org.apache.spark.sql.functions", + "from_csv", + x@jc, schema, options) +column(jc) + }) --- End diff -- newline at the end --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r216446691 --- Diff: sql/catalyst/pom.xml --- @@ -103,6 +103,12 @@ commons-codec commons-codec + + com.univocity + univocity-parsers + 2.7.3 + jar + --- End diff -- I added the dependency only because I have to move `UnivocityParser` from `sql/core` to `sql/catalyst` because it is not accessible from `sql/core`. Please, tell me what is right approach here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r216443482 --- Diff: sql/catalyst/pom.xml --- @@ -103,6 +103,12 @@ commons-codec commons-codec + + com.univocity + univocity-parsers + 2.7.3 + jar + --- End diff -- Hi, @MaxGekk , @gatorsmile , and @cloud-fan . I know this is the same approach with `from_json`, but suddenly I'm wondering if this is the right evolution direction, `sql` -> `catalyst`. Recently, we made `avro` as a external module and [the development direction was the opposite](https://github.com/apache/spark/pull/21742#discussion_r201569743). If we put this into `catalyst`, it become harder to be separated from sql module. > Ideally, we should separate parquet, orc and other built-in data sources from sql module. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
GitHub user MaxGekk opened a pull request: https://github.com/apache/spark/pull/22379 [SPARK-25393][SQL] Adding new function from_csv() ## What changes were proposed in this pull request? The PR adds new function `from_csv()` similar to `from_json()` to parse columns with CSV strings. I added the following methods: ```Scala def from_csv(e: Column, schema: StructType, options: Map[String, String]): Column ``` and this signature to call it from Python, R and Java: ```Scala def from_csv(e: Column, schema: String, options: java.util.Map[String, String]): Column ``` ## How was this patch tested? Added new test suites `CsvExpressionsSuite`, `CsvFunctionsSuite` and sql tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/MaxGekk/spark-1 from_csv Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22379.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22379 commit 344c2ab62095f91e161814230082f0d30c257365 Author: Maxim Gekk Date: 2018-09-09T13:07:34Z Initial implementation of CsvToStruct commit 5905019dd3fbe9dbba20505bf9df2b8ca62b4849 Author: Maxim Gekk Date: 2018-09-09T18:22:32Z Added CSV Expression Test Suite commit c5ac43273e70199467c28aaa40591042eac096aa Author: Maxim Gekk Date: 2018-09-09T18:42:49Z Register from_csv functions and add tests commit bd2124cfcf00b8e049154dad9903b22b176347bb Author: Maxim Gekk Date: 2018-09-09T19:07:00Z Fix imports commit b9bb081a3d655dbdece9d8a998853eda68171553 Author: Maxim Gekk Date: 2018-09-09T19:23:16Z Adding SQL tests commit 14ae619e2d6f18156fb24f0e391fb7ad11548ddd Author: Maxim Gekk Date: 2018-09-09T20:18:33Z from_csv for PySpark commit cfb2ac3844ccc7a374b3c3ec58150264b545269f Author: Maxim Gekk Date: 2018-09-10T08:47:27Z from_csv for SparkR --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org