[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22379


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r225218562
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -777,7 +777,6 @@ case class SchemaOfJson(
 }
 
 object JsonExprUtils {
-
   def evalSchemaExpr(exp: Expression): DataType = exp match {
--- End diff --

makes sense, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-15 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r225195159
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.csv._
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Converts a CSV input string to a [[StructType]] with the specified 
schema.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with 
the given `csvStr` and `schema`.",
+  examples = """
+Examples:
+  > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE');
+   {"a":1, "b":0.8}
+  > SELECT _FUNC_('26/08/2015', 'time Timestamp', 
map('timestampFormat', 'dd/MM/'))
+   {"time":2015-08-26 00:00:00.0}
+  """,
+  since = "3.0.0")
+// scalastyle:on line.size.limit
+case class CsvToStructs(
+schema: StructType,
+options: Map[String, String],
+child: Expression,
+timeZoneId: Option[String] = None)
+  extends UnaryExpression
+with TimeZoneAwareExpression
+with CodegenFallback
+with ExpectsInputTypes
+with NullIntolerant {
+
+  override def nullable: Boolean = child.nullable
+
+  // The CSV input data might be missing certain fields. We force the 
nullability
+  // of the user-provided schema to avoid data corruptions.
+  val nullableSchema: StructType = schema.asNullable
+
+  // Used in `FunctionRegistry`
+  def this(child: Expression, schema: Expression, options: Map[String, 
String]) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = options,
+  child = child,
+  timeZoneId = None)
+
+  def this(child: Expression, schema: Expression) = this(child, schema, 
Map.empty[String, String])
+
+  def this(child: Expression, schema: Expression, options: Expression) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = ExprUtils.convertToMapData(options),
+  child = child,
+  timeZoneId = None)
+
+  // This converts parsed rows to the desired output by the given schema.
+  @transient
+  lazy val converter = (rows: Iterator[InternalRow]) => {
+if (rows.hasNext) {
+  rows.next()
--- End diff --

of course!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-15 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r225190659
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -777,7 +777,6 @@ case class SchemaOfJson(
 }
 
 object JsonExprUtils {
-
   def evalSchemaExpr(exp: Expression): DataType = exp match {
--- End diff --

I was following @MaxGekk's opinion 
(https://github.com/apache/spark/pull/22379/files/93d094f45b02afc0ab2f0650bbde1513823471a2#r224846183).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-15 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r225190224
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.csv._
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Converts a CSV input string to a [[StructType]] with the specified 
schema.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with 
the given `csvStr` and `schema`.",
+  examples = """
+Examples:
+  > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE');
+   {"a":1, "b":0.8}
+  > SELECT _FUNC_('26/08/2015', 'time Timestamp', 
map('timestampFormat', 'dd/MM/'))
+   {"time":2015-08-26 00:00:00.0}
+  """,
+  since = "3.0.0")
+// scalastyle:on line.size.limit
+case class CsvToStructs(
+schema: StructType,
+options: Map[String, String],
+child: Expression,
+timeZoneId: Option[String] = None)
+  extends UnaryExpression
+with TimeZoneAwareExpression
+with CodegenFallback
+with ExpectsInputTypes
+with NullIntolerant {
+
+  override def nullable: Boolean = child.nullable
+
+  // The CSV input data might be missing certain fields. We force the 
nullability
+  // of the user-provided schema to avoid data corruptions.
+  val nullableSchema: StructType = schema.asNullable
+
+  // Used in `FunctionRegistry`
+  def this(child: Expression, schema: Expression, options: Map[String, 
String]) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = options,
+  child = child,
+  timeZoneId = None)
+
+  def this(child: Expression, schema: Expression) = this(child, schema, 
Map.empty[String, String])
+
+  def this(child: Expression, schema: Expression, options: Expression) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = ExprUtils.convertToMapData(options),
+  child = child,
+  timeZoneId = None)
+
+  // This converts parsed rows to the desired output by the given schema.
+  @transient
+  lazy val converter = (rows: Iterator[InternalRow]) => {
+if (rows.hasNext) {
+  rows.next()
--- End diff --

Looks rows can't be more then one in this CSV's code path specifically.

See below:

```scala
val rawParser = new UnivocityParser(actualSchema, actualSchema, 
parsedOptions)
new FailureSafeParser[String](
  input => Seq(rawParser.parse(input)),
  mode,
  nullableSchema,
  parsedOptions.columnNameOfCorruptRecord,
  parsedOptions.multiLine)
```

Univocity parser:

```scala
  def parse(input: String): InternalRow = 
convert(tokenizer.parseLine(input))
```

and in the `FailureSafeParser`

```scala
class FailureSafeParser[IN](
rawParser: IN => Seq[InternalRow],
mode: ParseMode,
schema: StructType,
columnNameOfCorruptRecord: String,
isMultiLine: Boolean) {
```

```scala
  def parse(input: IN): Iterator[InternalRow] = {
try {
 if (skipParsing) {
   Iterator.single(InternalRow.empty)
 } else {
   rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), 
() => null))
 }
} catch {
  case e: BadRecordException => mode match {
case PermissiveMode =>
  

[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-15 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r225186492
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.csv._
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Converts a CSV input string to a [[StructType]] with the specified 
schema.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with 
the given `csvStr` and `schema`.",
+  examples = """
+Examples:
+  > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE');
+   {"a":1, "b":0.8}
+  > SELECT _FUNC_('26/08/2015', 'time Timestamp', 
map('timestampFormat', 'dd/MM/'))
+   {"time":2015-08-26 00:00:00.0}
+  """,
+  since = "3.0.0")
+// scalastyle:on line.size.limit
+case class CsvToStructs(
+schema: StructType,
+options: Map[String, String],
+child: Expression,
+timeZoneId: Option[String] = None)
+  extends UnaryExpression
+with TimeZoneAwareExpression
+with CodegenFallback
+with ExpectsInputTypes
+with NullIntolerant {
+
+  override def nullable: Boolean = child.nullable
+
+  // The CSV input data might be missing certain fields. We force the 
nullability
+  // of the user-provided schema to avoid data corruptions.
+  val nullableSchema: StructType = schema.asNullable
+
+  // Used in `FunctionRegistry`
+  def this(child: Expression, schema: Expression, options: Map[String, 
String]) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = options,
+  child = child,
+  timeZoneId = None)
+
+  def this(child: Expression, schema: Expression) = this(child, schema, 
Map.empty[String, String])
+
+  def this(child: Expression, schema: Expression, options: Expression) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = ExprUtils.convertToMapData(options),
+  child = child,
+  timeZoneId = None)
+
+  // This converts parsed rows to the desired output by the given schema.
+  @transient
+  lazy val converter = (rows: Iterator[InternalRow]) => {
+if (rows.hasNext) {
+  rows.next()
--- End diff --

Oops, I missed. Let me check


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r225183977
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -777,7 +777,6 @@ case class SchemaOfJson(
 }
 
 object JsonExprUtils {
-
   def evalSchemaExpr(exp: Expression): DataType = exp match {
--- End diff --

do we still need it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r225033899
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
 ---
@@ -254,7 +256,7 @@ object TextInputCSVDataSource extends CSVDataSource {
 val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
 val sampled: Dataset[String] = CSVUtils.sample(csv, parsedOptions)
 val tokenRDD = sampled.rdd.mapPartitions { iter =>
-  val filteredLines = CSVUtils.filterCommentAndEmpty(iter, 
parsedOptions)
+  val filteredLines = filterCommentAndEmpty(iter, parsedOptions)
--- End diff --

+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r225033808
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.csv._
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Converts a CSV input string to a [[StructType]] with the specified 
schema.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with 
the given `csvStr` and `schema`.",
+  examples = """
+Examples:
+  > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE');
+   {"a":1, "b":0.8}
+  > SELECT _FUNC_('26/08/2015', 'time Timestamp', 
map('timestampFormat', 'dd/MM/'))
+   {"time":2015-08-26 00:00:00.0}
+  """,
+  since = "3.0.0")
+// scalastyle:on line.size.limit
+case class CsvToStructs(
+schema: StructType,
+options: Map[String, String],
+child: Expression,
+timeZoneId: Option[String] = None)
+  extends UnaryExpression
+with TimeZoneAwareExpression
+with CodegenFallback
+with ExpectsInputTypes
+with NullIntolerant {
+
+  override def nullable: Boolean = child.nullable
+
+  // The CSV input data might be missing certain fields. We force the 
nullability
+  // of the user-provided schema to avoid data corruptions.
+  val nullableSchema: StructType = schema.asNullable
+
+  // Used in `FunctionRegistry`
+  def this(child: Expression, schema: Expression, options: Map[String, 
String]) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = options,
+  child = child,
+  timeZoneId = None)
+
+  def this(child: Expression, schema: Expression) = this(child, schema, 
Map.empty[String, String])
+
+  def this(child: Expression, schema: Expression, options: Expression) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = ExprUtils.convertToMapData(options),
+  child = child,
+  timeZoneId = None)
+
+  // This converts parsed rows to the desired output by the given schema.
+  @transient
+  lazy val converter = (rows: Iterator[InternalRow]) => {
+if (rows.hasNext) {
+  rows.next()
--- End diff --

what if `rows` have more than one row? shall we fail or shall we return 
null?

Up to my understanding it should fail. The parser should only return one 
row for struct type. If it doesn't, there must be a bug.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-14 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r225030219
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---
@@ -3854,6 +3854,38 @@ object functions {
   @scala.annotation.varargs
   def map_concat(cols: Column*): Column = withExpr { 
MapConcat(cols.map(_.expr)) }
 
+  /**
+   * Parses a column containing a CSV string into a `StructType` with the 
specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e a string column containing CSV data.
+   * @param schema the schema to use when parsing the CSV string
+   * @param options options to control how the CSV is parsed. accepts the 
same options and the
+   *CSV data source.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def from_csv(e: Column, schema: StructType, options: Map[String, 
String]): Column = withExpr {
+CsvToStructs(schema, options, e.expr)
+  }
+
+  /**
+   * (Java-specific) Parses a column containing a CSV string into a 
`StructType`
+   * with the specified schema. Returns `null`, in the case of an 
unparseable string.
+   *
+   * @param e a string column containing CSV data.
+   * @param schema the schema to use when parsing the CSV string
+   * @param options options to control how the CSV is parsed. accepts the 
same options and the
+   *CSV data source.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def from_csv(e: Column, schema: String, options: java.util.Map[String, 
String]): Column = {
--- End diff --

Let me address this one tonight.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-14 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r224985980
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---
@@ -3854,6 +3854,38 @@ object functions {
   @scala.annotation.varargs
   def map_concat(cols: Column*): Column = withExpr { 
MapConcat(cols.map(_.expr)) }
 
+  /**
+   * Parses a column containing a CSV string into a `StructType` with the 
specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e a string column containing CSV data.
+   * @param schema the schema to use when parsing the CSV string
+   * @param options options to control how the CSV is parsed. accepts the 
same options and the
+   *CSV data source.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def from_csv(e: Column, schema: StructType, options: Map[String, 
String]): Column = withExpr {
+CsvToStructs(schema, options, e.expr)
+  }
+
+  /**
+   * (Java-specific) Parses a column containing a CSV string into a 
`StructType`
+   * with the specified schema. Returns `null`, in the case of an 
unparseable string.
+   *
+   * @param e a string column containing CSV data.
+   * @param schema the schema to use when parsing the CSV string
+   * @param options options to control how the CSV is parsed. accepts the 
same options and the
+   *CSV data source.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def from_csv(e: Column, schema: String, options: java.util.Map[String, 
String]): Column = {
--- End diff --

Eh, I wasn't following too. Is the problem related to its parameters for 
the function? we can just define `characterOrColumn` and use it. We can do 
something like:

```
if # is character
  schema <- lit(schema)
else if # is column
  schema <- schema
else
  stop("it should be column or characters")
```

like you did in Python side.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-13 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r224950964
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---
@@ -3854,6 +3854,38 @@ object functions {
   @scala.annotation.varargs
   def map_concat(cols: Column*): Column = withExpr { 
MapConcat(cols.map(_.expr)) }
 
+  /**
+   * Parses a column containing a CSV string into a `StructType` with the 
specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e a string column containing CSV data.
+   * @param schema the schema to use when parsing the CSV string
+   * @param options options to control how the CSV is parsed. accepts the 
same options and the
+   *CSV data source.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def from_csv(e: Column, schema: StructType, options: Map[String, 
String]): Column = withExpr {
+CsvToStructs(schema, options, e.expr)
+  }
+
+  /**
+   * (Java-specific) Parses a column containing a CSV string into a 
`StructType`
+   * with the specified schema. Returns `null`, in the case of an 
unparseable string.
+   *
+   * @param e a string column containing CSV data.
+   * @param schema the schema to use when parsing the CSV string
+   * @param options options to control how the CSV is parsed. accepts the 
same options and the
+   *CSV data source.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def from_csv(e: Column, schema: String, options: java.util.Map[String, 
String]): Column = {
--- End diff --

> but I wasn't following - what's the case for schema is a Column?

The `schema_of_json`/`schema_of_csv` functions take a string in a column 
and return inferred schema of JSON/CSV as a column (in DDL format). To combine 
`from_json`/`from_csv` with `schema_of_json`/`schema_of_csv` need to pass a 
schema as the `Column` type. See in Scala : 
https://github.com/apache/spark/blob/1007cae20e8f566e7d7c25f0f81c9b84f352b6d5/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala#L396


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-12 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r224949068
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---
@@ -3854,6 +3854,38 @@ object functions {
   @scala.annotation.varargs
   def map_concat(cols: Column*): Column = withExpr { 
MapConcat(cols.map(_.expr)) }
 
+  /**
+   * Parses a column containing a CSV string into a `StructType` with the 
specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e a string column containing CSV data.
+   * @param schema the schema to use when parsing the CSV string
+   * @param options options to control how the CSV is parsed. accepts the 
same options and the
+   *CSV data source.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def from_csv(e: Column, schema: StructType, options: Map[String, 
String]): Column = withExpr {
+CsvToStructs(schema, options, e.expr)
+  }
+
+  /**
+   * (Java-specific) Parses a column containing a CSV string into a 
`StructType`
+   * with the specified schema. Returns `null`, in the case of an 
unparseable string.
+   *
+   * @param e a string column containing CSV data.
+   * @param schema the schema to use when parsing the CSV string
+   * @param options options to control how the CSV is parsed. accepts the 
same options and the
+   *CSV data source.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def from_csv(e: Column, schema: String, options: java.util.Map[String, 
String]): Column = {
--- End diff --

you can create a new type characterOrstructOrColumn? (letter casing is 
weird)

but I wasn't following - what's the case for schema is a Column?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-12 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r224846183
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{MapType, StringType, StructType}
+
+object ExprUtils {
+
+  def evalSchemaExpr(exp: Expression): StructType = exp match {
--- End diff --

The difference between the two functions appeared when I modified 
`evalSchemaExpr` in `JsonExprUtils` to support `schema_of_json`. When we rebase 
and prepare `schema_of_csv` in the PR 
https://github.com/apache/spark/pull/22666, we will merge those two functions. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-12 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r224844756
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVUtils.scala ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.csv
+
+object CSVUtils {
+  /**
+   * Filter ignorable rows for CSV iterator (lines empty and starting with 
`comment`).
+   * This is currently being used in CSV reading path and CSV schema 
inference.
+   */
+  def filterCommentAndEmpty(iter: Iterator[String], options: CSVOptions): 
Iterator[String] = {
+iter.filter { line =>
+  line.trim.nonEmpty && !line.startsWith(options.comment.toString)
+}
+  }
+
+  /**
+   * Helper method that converts string representation of a character to 
actual character.
+   * It handles some Java escaped strings and throws exception if given 
string is longer than one
+   * character.
+   */
+  @throws[IllegalArgumentException]
+  def toChar(str: String): Char = {
--- End diff --

There shouldn't be duplicates there. I moved all functions used in 
`sql/catalyst` out of `sql/core`. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-12 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r224843696
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---
@@ -3854,6 +3854,38 @@ object functions {
   @scala.annotation.varargs
   def map_concat(cols: Column*): Column = withExpr { 
MapConcat(cols.map(_.expr)) }
 
+  /**
+   * Parses a column containing a CSV string into a `StructType` with the 
specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e a string column containing CSV data.
+   * @param schema the schema to use when parsing the CSV string
+   * @param options options to control how the CSV is parsed. accepts the 
same options and the
+   *CSV data source.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def from_csv(e: Column, schema: StructType, options: Map[String, 
String]): Column = withExpr {
+CsvToStructs(schema, options, e.expr)
+  }
+
+  /**
+   * (Java-specific) Parses a column containing a CSV string into a 
`StructType`
+   * with the specified schema. Returns `null`, in the case of an 
unparseable string.
+   *
+   * @param e a string column containing CSV data.
+   * @param schema the schema to use when parsing the CSV string
+   * @param options options to control how the CSV is parsed. accepts the 
same options and the
+   *CSV data source.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def from_csv(e: Column, schema: String, options: java.util.Map[String, 
String]): Column = {
--- End diff --

What's stopped me to do that was I didn't know how to support the `Column` 
type in R. I even opened JIRA ticket for a similar issue related to 
`schema_of_json`: https://issues.apache.org/jira/browse/SPARK-25446 . The 
`from_json()` accepts the schema as `characterOrstructType` and how to extend 
to support the `Column` type as well not clear to me:

https://github.com/apache/spark/blob/17781d75308c328b11cab3658ca4f358539414f2/R/pkg/R/functions.R#L2186
 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r224649633
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
 ---
@@ -19,8 +19,8 @@ package org.apache.spark.sql.execution.datasources.csv
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.catalyst.csv.CSVOptions
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.types._
 
 object CSVUtils {
--- End diff --

@MaxGekk, actually I was wondering if it's difficult to move this under 
catalyst package as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r224649495
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---
@@ -3854,6 +3854,38 @@ object functions {
   @scala.annotation.varargs
   def map_concat(cols: Column*): Column = withExpr { 
MapConcat(cols.map(_.expr)) }
 
+  /**
+   * Parses a column containing a CSV string into a `StructType` with the 
specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e a string column containing CSV data.
+   * @param schema the schema to use when parsing the CSV string
+   * @param options options to control how the CSV is parsed. accepts the 
same options and the
+   *CSV data source.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def from_csv(e: Column, schema: StructType, options: Map[String, 
String]): Column = withExpr {
--- End diff --

I would like to suggest to avoid adding overridden versions for now ... it 
has one Java specific version so should be usable in Java side.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r224649188
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---
@@ -3854,6 +3854,38 @@ object functions {
   @scala.annotation.varargs
   def map_concat(cols: Column*): Column = withExpr { 
MapConcat(cols.map(_.expr)) }
 
+  /**
+   * Parses a column containing a CSV string into a `StructType` with the 
specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e a string column containing CSV data.
+   * @param schema the schema to use when parsing the CSV string
+   * @param options options to control how the CSV is parsed. accepts the 
same options and the
+   *CSV data source.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def from_csv(e: Column, schema: StructType, options: Map[String, 
String]): Column = withExpr {
+CsvToStructs(schema, options, e.expr)
+  }
+
+  /**
+   * (Java-specific) Parses a column containing a CSV string into a 
`StructType`
+   * with the specified schema. Returns `null`, in the case of an 
unparseable string.
+   *
+   * @param e a string column containing CSV data.
+   * @param schema the schema to use when parsing the CSV string
+   * @param options options to control how the CSV is parsed. accepts the 
same options and the
+   *CSV data source.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def from_csv(e: Column, schema: String, options: java.util.Map[String, 
String]): Column = {
--- End diff --

@MaxGekk, can we replace `schema: String` to `schema: Column` for 
`schema_of_csv`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r224648638
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
 ---
@@ -40,16 +40,6 @@ object CSVUtils {
 }
   }
 
-  /**
-   * Filter ignorable rows for CSV iterator (lines empty and starting with 
`comment`).
-   * This is currently being used in CSV reading path and CSV schema 
inference.
-   */
-  def filterCommentAndEmpty(iter: Iterator[String], options: CSVOptions): 
Iterator[String] = {
--- End diff --

nope. It's under execution package.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r224648258
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
 ---
@@ -254,7 +256,7 @@ object TextInputCSVDataSource extends CSVDataSource {
 val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
 val sampled: Dataset[String] = CSVUtils.sample(csv, parsedOptions)
 val tokenRDD = sampled.rdd.mapPartitions { iter =>
-  val filteredLines = CSVUtils.filterCommentAndEmpty(iter, 
parsedOptions)
+  val filteredLines = filterCommentAndEmpty(iter, parsedOptions)
--- End diff --

not a big deal but let's just use `CSVUtils...` usage just for consistency 
in this file.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r224631648
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---
@@ -3854,6 +3854,38 @@ object functions {
   @scala.annotation.varargs
   def map_concat(cols: Column*): Column = withExpr { 
MapConcat(cols.map(_.expr)) }
 
+  /**
+   * Parses a column containing a CSV string into a `StructType` with the 
specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e a string column containing CSV data.
+   * @param schema the schema to use when parsing the CSV string
+   * @param options options to control how the CSV is parsed. accepts the 
same options and the
+   *CSV data source.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def from_csv(e: Column, schema: StructType, options: Map[String, 
String]): Column = withExpr {
--- End diff --

Do we need another API for JAVA? `java.util.Map[String, String]`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r224631037
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
 ---
@@ -40,16 +40,6 @@ object CSVUtils {
 }
   }
 
-  /**
-   * Filter ignorable rows for CSV iterator (lines empty and starting with 
`comment`).
-   * This is currently being used in CSV reading path and CSV schema 
inference.
-   */
-  def filterCommentAndEmpty(iter: Iterator[String], options: CSVOptions): 
Iterator[String] = {
--- End diff --

Is this a public API? cc @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r224629618
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVUtils.scala ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.csv
+
+object CSVUtils {
+  /**
+   * Filter ignorable rows for CSV iterator (lines empty and starting with 
`comment`).
+   * This is currently being used in CSV reading path and CSV schema 
inference.
+   */
+  def filterCommentAndEmpty(iter: Iterator[String], options: CSVOptions): 
Iterator[String] = {
+iter.filter { line =>
+  line.trim.nonEmpty && !line.startsWith(options.comment.toString)
+}
+  }
+
+  /**
+   * Helper method that converts string representation of a character to 
actual character.
+   * It handles some Java escaped strings and throws exception if given 
string is longer than one
+   * character.
+   */
+  @throws[IllegalArgumentException]
+  def toChar(str: String): Char = {
--- End diff --

Do we need to keep both versions? Can we just let the functions in 
`org.apache.spark.sql.execution.datasources.csv` call the func here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r224630712
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{MapType, StringType, StructType}
+
+object ExprUtils {
+
+  def evalSchemaExpr(exp: Expression): StructType = exp match {
--- End diff --

Can we make `JsonExprUtils.evalSchemaExpr` and `ExprUtils.evalSchemaExpr` 
consistent? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r224629829
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{MapType, StringType, StructType}
+
+object ExprUtils {
--- End diff --

The same comment in this object


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r224629713
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVUtils.scala ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.csv
+
+object CSVUtils {
+  /**
+   * Filter ignorable rows for CSV iterator (lines empty and starting with 
`comment`).
+   * This is currently being used in CSV reading path and CSV schema 
inference.
+   */
+  def filterCommentAndEmpty(iter: Iterator[String], options: CSVOptions): 
Iterator[String] = {
+iter.filter { line =>
+  line.trim.nonEmpty && !line.startsWith(options.comment.toString)
--- End diff --

The same comment here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-05 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r223152506
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.Calendar
+
+import scala.util.Random
+
+import org.scalatest.exceptions.TestFailedException
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.plans.PlanTestBase
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper 
with PlanTestBase {
+  val badCsv = "\u\u\uA\u0001AAA"
+
+  val gmtId = Option(DateTimeUtils.TimeZoneGMT.getID)
+
+  test("from_csv") {
+val csvData = "1"
+val schema = StructType(StructField("a", IntegerType) :: Nil)
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal(csvData), gmtId),
+  InternalRow(1)
+)
+  }
+
+  test("from_csv - invalid data") {
+val csvData = "---"
+val schema = StructType(StructField("a", DoubleType) :: Nil)
+checkEvaluation(
+  CsvToStructs(schema, Map("mode" -> PermissiveMode.name), 
Literal(csvData), gmtId),
+  InternalRow(null))
+
+// Default mode is Permissive
+checkEvaluation(CsvToStructs(schema, Map.empty, Literal(csvData), 
gmtId), InternalRow(null))
+  }
+
+  test("from_csv null input column") {
+val schema = StructType(StructField("a", IntegerType) :: Nil)
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal.create(null, StringType), 
gmtId),
+  null
+)
+  }
+
+  test("from_csv bad UTF-8") {
+val schema = StructType(StructField("a", IntegerType) :: Nil)
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal(badCsv), gmtId),
+  InternalRow(null))
+  }
+
+  test("from_csv with timestamp") {
+val schema = StructType(StructField("t", TimestampType) :: Nil)
+
+val csvData1 = "2016-01-01T00:00:00.123Z"
+var c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT)
+c.set(2016, 0, 1, 0, 0, 0)
+c.set(Calendar.MILLISECOND, 123)
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal(csvData1), gmtId),
+  InternalRow(c.getTimeInMillis * 1000L)
+)
+// The result doesn't change because the CSV string includes timezone 
string ("Z" here),
+// which means the string represents the timestamp string in the 
timezone regardless of
+// the timeZoneId parameter.
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal(csvData1), Option("PST")),
+  InternalRow(c.getTimeInMillis * 1000L)
+)
+
+val csvData2 = "2016-01-01T00:00:00"
+for (tz <- Random.shuffle(DateTimeTestUtils.ALL_TIMEZONES).take(50)) {
--- End diff --

several hundred milliseconds


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r223039662
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.Calendar
+
+import scala.util.Random
+
+import org.scalatest.exceptions.TestFailedException
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.plans.PlanTestBase
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper 
with PlanTestBase {
+  val badCsv = "\u\u\uA\u0001AAA"
+
+  val gmtId = Option(DateTimeUtils.TimeZoneGMT.getID)
+
+  test("from_csv") {
+val csvData = "1"
+val schema = StructType(StructField("a", IntegerType) :: Nil)
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal(csvData), gmtId),
+  InternalRow(1)
+)
+  }
+
+  test("from_csv - invalid data") {
+val csvData = "---"
+val schema = StructType(StructField("a", DoubleType) :: Nil)
+checkEvaluation(
+  CsvToStructs(schema, Map("mode" -> PermissiveMode.name), 
Literal(csvData), gmtId),
+  InternalRow(null))
+
+// Default mode is Permissive
+checkEvaluation(CsvToStructs(schema, Map.empty, Literal(csvData), 
gmtId), InternalRow(null))
+  }
+
+  test("from_csv null input column") {
+val schema = StructType(StructField("a", IntegerType) :: Nil)
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal.create(null, StringType), 
gmtId),
+  null
+)
+  }
+
+  test("from_csv bad UTF-8") {
+val schema = StructType(StructField("a", IntegerType) :: Nil)
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal(badCsv), gmtId),
+  InternalRow(null))
+  }
+
+  test("from_csv with timestamp") {
+val schema = StructType(StructField("t", TimestampType) :: Nil)
+
+val csvData1 = "2016-01-01T00:00:00.123Z"
+var c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT)
+c.set(2016, 0, 1, 0, 0, 0)
+c.set(Calendar.MILLISECOND, 123)
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal(csvData1), gmtId),
+  InternalRow(c.getTimeInMillis * 1000L)
+)
+// The result doesn't change because the CSV string includes timezone 
string ("Z" here),
+// which means the string represents the timestamp string in the 
timezone regardless of
+// the timeZoneId parameter.
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal(csvData1), Option("PST")),
+  InternalRow(c.getTimeInMillis * 1000L)
+)
+
+val csvData2 = "2016-01-01T00:00:00"
+for (tz <- Random.shuffle(DateTimeTestUtils.ALL_TIMEZONES).take(50)) {
--- End diff --

how long does this test take? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r223039004
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.csv._
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Converts a CSV input string to a [[StructType]] with the specified 
schema.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with 
the given `csvStr` and `schema`.",
+  examples = """
+Examples:
+  > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE');
+   {"a":1, "b":0.8}
+  > SELECT _FUNC_('26/08/2015', 'time Timestamp', 
map('timestampFormat', 'dd/MM/'))
+   {"time":2015-08-26 00:00:00.0}
+  """,
+  since = "3.0.0")
+// scalastyle:on line.size.limit
+case class CsvToStructs(
+schema: StructType,
+options: Map[String, String],
+child: Expression,
+timeZoneId: Option[String] = None)
+  extends UnaryExpression
+with TimeZoneAwareExpression
+with CodegenFallback
+with ExpectsInputTypes
+with NullIntolerant {
+
+  override def nullable: Boolean = child.nullable
+
+  // The CSV input data might be missing certain fields. We force the 
nullability
+  // of the user-provided schema to avoid data corruptions.
+  val nullableSchema: StructType = schema.asNullable
+
+  // Used in `FunctionRegistry`
+  def this(child: Expression, schema: Expression, options: Map[String, 
String]) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = options,
+  child = child,
+  timeZoneId = None)
+
+  def this(child: Expression, schema: Expression) = this(child, schema, 
Map.empty[String, String])
+
+  def this(child: Expression, schema: Expression, options: Expression) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = ExprUtils.convertToMapData(options),
+  child = child,
+  timeZoneId = None)
+
+  // This converts parsed rows to the desired output by the given schema.
+  @transient
+  lazy val converter = (rows: Iterator[InternalRow]) =>
+if (rows.hasNext) {
+  rows.next()
+} else {
+  throw new IllegalArgumentException("Expected at least one row from 
CSV parser.")
--- End diff --

so we are fine with more than one row?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-05 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r222954838
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.csv._
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Converts a CSV input string to a [[StructType]] with the specified 
schema.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with 
the given `csvStr` and `schema`.",
+  examples = """
+Examples:
+  > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE');
+   {"a":1, "b":0.8}
+  > SELECT _FUNC_('26/08/2015', 'time Timestamp', 
map('timestampFormat', 'dd/MM/'))
+   {"time":2015-08-26 00:00:00.0}
+  """,
+  since = "3.0.0")
+// scalastyle:on line.size.limit
+case class CsvToStructs(
+schema: StructType,
+options: Map[String, String],
+child: Expression,
+timeZoneId: Option[String] = None)
+  extends UnaryExpression with TimeZoneAwareExpression with 
CodegenFallback with ExpectsInputTypes {
+
+  override def nullable: Boolean = true
+
+  // The CSV input data might be missing certain fields. We force the 
nullability
+  // of the user-provided schema to avoid data corruptions.
+  val nullableSchema = schema.asNullable
+
+  // Used in `FunctionRegistry`
+  def this(child: Expression, schema: Expression, options: Map[String, 
String]) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = options,
+  child = child,
+  timeZoneId = None)
+
+  def this(child: Expression, schema: Expression) = this(child, schema, 
Map.empty[String, String])
+
+  def this(child: Expression, schema: Expression, options: Expression) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = ExprUtils.convertToMapData(options),
+  child = child,
+  timeZoneId = None)
+
+  // This converts parsed rows to the desired output by the given schema.
+  @transient
+  lazy val converter = (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next() else null
--- End diff --

Right, we expect one row from `FailureSafeParser`. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r222894891
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2664,6 +2664,28 @@ def sequence(start, stop, step=None):
 _to_java_column(start), _to_java_column(stop), 
_to_java_column(step)))
 
 
+@ignore_unicode_prefix
+@since(3.0)
+def from_csv(col, schema, options={}):
+"""
+Parses a column containing a CSV string into a :class:`StructType`
--- End diff --

`StructType` -> `Row`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r222895375
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.csv._
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Converts a CSV input string to a [[StructType]] with the specified 
schema.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with 
the given `csvStr` and `schema`.",
+  examples = """
+Examples:
+  > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE');
+   {"a":1, "b":0.8}
+  > SELECT _FUNC_('26/08/2015', 'time Timestamp', 
map('timestampFormat', 'dd/MM/'))
+   {"time":2015-08-26 00:00:00.0}
+  """,
+  since = "3.0.0")
+// scalastyle:on line.size.limit
+case class CsvToStructs(
+schema: StructType,
+options: Map[String, String],
+child: Expression,
+timeZoneId: Option[String] = None)
+  extends UnaryExpression with TimeZoneAwareExpression with 
CodegenFallback with ExpectsInputTypes {
+
+  override def nullable: Boolean = true
+
+  // The CSV input data might be missing certain fields. We force the 
nullability
+  // of the user-provided schema to avoid data corruptions.
+  val nullableSchema = schema.asNullable
+
+  // Used in `FunctionRegistry`
+  def this(child: Expression, schema: Expression, options: Map[String, 
String]) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = options,
+  child = child,
+  timeZoneId = None)
+
+  def this(child: Expression, schema: Expression) = this(child, schema, 
Map.empty[String, String])
+
+  def this(child: Expression, schema: Expression, options: Expression) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = ExprUtils.convertToMapData(options),
+  child = child,
+  timeZoneId = None)
+
+  // This converts parsed rows to the desired output by the given schema.
+  @transient
+  lazy val converter = (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next() else null
--- End diff --

the input string is expected to be a single CSV record right? Shall we fail 
if it's not?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r222895573
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.csv._
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Converts a CSV input string to a [[StructType]] with the specified 
schema.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with 
the given `csvStr` and `schema`.",
+  examples = """
+Examples:
+  > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE');
+   {"a":1, "b":0.8}
+  > SELECT _FUNC_('26/08/2015', 'time Timestamp', 
map('timestampFormat', 'dd/MM/'))
+   {"time":2015-08-26 00:00:00.0}
+  """,
+  since = "3.0.0")
+// scalastyle:on line.size.limit
+case class CsvToStructs(
+schema: StructType,
+options: Map[String, String],
+child: Expression,
+timeZoneId: Option[String] = None)
+  extends UnaryExpression with TimeZoneAwareExpression with 
CodegenFallback with ExpectsInputTypes {
+
+  override def nullable: Boolean = true
--- End diff --

This expression only returns null if input is null. Shall we define the 
nullable as `child.nullable`? And this expression should extend `NullIntolerant`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r222874417
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala
 ---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.Calendar
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.plans.PlanTestBase
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper 
with PlanTestBase {
+  val badCsv = "\u\u\uA\u0001AAA"
+
+  val gmtId = Option(DateTimeUtils.TimeZoneGMT.getID)
+
+  test("from_csv") {
+val csvData = "1"
+val schema = StructType(StructField("a", IntegerType) :: Nil)
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal(csvData), gmtId),
+  InternalRow(1)
+)
+  }
+
+  test("from_csv - invalid data") {
+val csvData = "---"
+val schema = StructType(StructField("a", DoubleType) :: Nil)
+checkEvaluation(
+  CsvToStructs(schema, Map("mode" -> PermissiveMode.name), 
Literal(csvData), gmtId),
+  InternalRow(null))
+
+// Default mode is Permissive
+checkEvaluation(CsvToStructs(schema, Map.empty, Literal(csvData), 
gmtId), InternalRow(null))
+  }
+
+  test("from_csv null input column") {
+val schema = StructType(StructField("a", IntegerType) :: Nil)
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal.create(null, StringType), 
gmtId),
+  null
+)
+  }
+
+  test("from_csv bad UTF-8") {
+val schema = StructType(StructField("a", IntegerType) :: Nil)
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal(badCsv), gmtId),
+  InternalRow(null))
+  }
+
+  test("from_csv with timestamp") {
+val schema = StructType(StructField("t", TimestampType) :: Nil)
+
+val csvData1 = "2016-01-01T00:00:00.123Z"
+var c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT)
+c.set(2016, 0, 1, 0, 0, 0)
+c.set(Calendar.MILLISECOND, 123)
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal(csvData1), gmtId),
+  InternalRow(c.getTimeInMillis * 1000L)
+)
+// The result doesn't change because the CSV string includes timezone 
string ("Z" here),
+// which means the string represents the timestamp string in the 
timezone regardless of
+// the timeZoneId parameter.
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal(csvData1), Option("PST")),
+  InternalRow(c.getTimeInMillis * 1000L)
+)
+
+val csvData2 = "2016-01-01T00:00:00"
+for (tz <- DateTimeTestUtils.ALL_TIMEZONES) {
--- End diff --

It sounds good to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-04 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r222807843
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.csv._
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Converts a CSV input string to a [[StructType]] with the specified 
schema.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with 
the given `csvStr` and `schema`.",
+  examples = """
+Examples:
+  > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE');
+   {"a":1, "b":0.8}
+  """,
+  since = "3.0.0")
+// scalastyle:on line.size.limit
+case class CsvToStructs(
+schema: StructType,
+options: Map[String, String],
+child: Expression,
+timeZoneId: Option[String] = None)
+  extends UnaryExpression with TimeZoneAwareExpression with 
CodegenFallback with ExpectsInputTypes {
+
+  override def nullable: Boolean = true
+
+  // The CSV input data might be missing certain fields. We force the 
nullability
+  // of the user-provided schema to avoid data corruptions.
+  val nullableSchema = schema.asNullable
+
+  // Used in `FunctionRegistry`
+  def this(child: Expression, schema: Expression, options: Map[String, 
String]) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = options,
+  child = child,
+  timeZoneId = None)
+
+  def this(child: Expression, schema: Expression) = this(child, schema, 
Map.empty[String, String])
+
+  def this(child: Expression, schema: Expression, options: Expression) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = ExprUtils.convertToMapData(options),
+  child = child,
+  timeZoneId = None)
+
+  // This converts parsed rows to the desired output by the given schema.
+  @transient
+  lazy val converter = (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next() else null
+
+  @transient lazy val parser = {
+val parsedOptions = new CSVOptions(options, true, timeZoneId.get)
+val mode = parsedOptions.parseMode
+if (mode != PermissiveMode && mode != FailFastMode) {
+  throw new AnalysisException(s"from_csv() doesn't support the 
${mode.name} mode. " +
+s"Acceptable modes are ${PermissiveMode.name} and 
${FailFastMode.name}.")
+}
+val actualSchema =
+  StructType(nullableSchema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord))
--- End diff --

added


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-04 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r222797840
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala
 ---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.Calendar
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.plans.PlanTestBase
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper 
with PlanTestBase {
+  val badCsv = "\u\u\uA\u0001AAA"
+
+  val gmtId = Option(DateTimeUtils.TimeZoneGMT.getID)
+
+  test("from_csv") {
+val csvData = "1"
+val schema = StructType(StructField("a", IntegerType) :: Nil)
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal(csvData), gmtId),
+  InternalRow(1)
+)
+  }
+
+  test("from_csv - invalid data") {
+val csvData = "---"
+val schema = StructType(StructField("a", DoubleType) :: Nil)
+checkEvaluation(
+  CsvToStructs(schema, Map("mode" -> PermissiveMode.name), 
Literal(csvData), gmtId),
+  InternalRow(null))
+
+// Default mode is Permissive
+checkEvaluation(CsvToStructs(schema, Map.empty, Literal(csvData), 
gmtId), InternalRow(null))
+  }
+
+  test("from_csv null input column") {
+val schema = StructType(StructField("a", IntegerType) :: Nil)
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal.create(null, StringType), 
gmtId),
+  null
+)
+  }
+
+  test("from_csv bad UTF-8") {
+val schema = StructType(StructField("a", IntegerType) :: Nil)
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal(badCsv), gmtId),
+  InternalRow(null))
+  }
+
+  test("from_csv with timestamp") {
+val schema = StructType(StructField("t", TimestampType) :: Nil)
+
+val csvData1 = "2016-01-01T00:00:00.123Z"
+var c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT)
+c.set(2016, 0, 1, 0, 0, 0)
+c.set(Calendar.MILLISECOND, 123)
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal(csvData1), gmtId),
+  InternalRow(c.getTimeInMillis * 1000L)
+)
+// The result doesn't change because the CSV string includes timezone 
string ("Z" here),
+// which means the string represents the timestamp string in the 
timezone regardless of
+// the timeZoneId parameter.
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal(csvData1), Option("PST")),
+  InternalRow(c.getTimeInMillis * 1000L)
+)
+
+val csvData2 = "2016-01-01T00:00:00"
+for (tz <- DateTimeTestUtils.ALL_TIMEZONES) {
--- End diff --

Does it make sense to apply the same approach like there: 
https://github.com/apache/spark/pull/22631


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-04 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r222796379
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala
 ---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.Calendar
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.plans.PlanTestBase
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper 
with PlanTestBase {
+  val badCsv = "\u\u\uA\u0001AAA"
+
+  val gmtId = Option(DateTimeUtils.TimeZoneGMT.getID)
+
+  test("from_csv") {
+val csvData = "1"
+val schema = StructType(StructField("a", IntegerType) :: Nil)
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal(csvData), gmtId),
+  InternalRow(1)
+)
+  }
+
+  test("from_csv - invalid data") {
+val csvData = "---"
+val schema = StructType(StructField("a", DoubleType) :: Nil)
+checkEvaluation(
+  CsvToStructs(schema, Map("mode" -> PermissiveMode.name), 
Literal(csvData), gmtId),
+  InternalRow(null))
+
+// Default mode is Permissive
+checkEvaluation(CsvToStructs(schema, Map.empty, Literal(csvData), 
gmtId), InternalRow(null))
+  }
+
+  test("from_csv null input column") {
+val schema = StructType(StructField("a", IntegerType) :: Nil)
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal.create(null, StringType), 
gmtId),
+  null
+)
+  }
+
+  test("from_csv bad UTF-8") {
+val schema = StructType(StructField("a", IntegerType) :: Nil)
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal(badCsv), gmtId),
+  InternalRow(null))
+  }
+
+  test("from_csv with timestamp") {
+val schema = StructType(StructField("t", TimestampType) :: Nil)
+
+val csvData1 = "2016-01-01T00:00:00.123Z"
+var c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT)
+c.set(2016, 0, 1, 0, 0, 0)
+c.set(Calendar.MILLISECOND, 123)
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal(csvData1), gmtId),
+  InternalRow(c.getTimeInMillis * 1000L)
+)
+// The result doesn't change because the CSV string includes timezone 
string ("Z" here),
+// which means the string represents the timestamp string in the 
timezone regardless of
+// the timeZoneId parameter.
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal(csvData1), Option("PST")),
+  InternalRow(c.getTimeInMillis * 1000L)
+)
+
+val csvData2 = "2016-01-01T00:00:00"
+for (tz <- DateTimeTestUtils.ALL_TIMEZONES) {
--- End diff --

@gatorsmile It takes ~ 9.5 seconds on my laptop. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r222777373
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.csv._
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Converts a CSV input string to a [[StructType]] with the specified 
schema.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with 
the given `csvStr` and `schema`.",
+  examples = """
+Examples:
+  > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE');
+   {"a":1, "b":0.8}
+  """,
+  since = "3.0.0")
+// scalastyle:on line.size.limit
+case class CsvToStructs(
+schema: StructType,
+options: Map[String, String],
+child: Expression,
+timeZoneId: Option[String] = None)
+  extends UnaryExpression with TimeZoneAwareExpression with 
CodegenFallback with ExpectsInputTypes {
+
+  override def nullable: Boolean = true
+
+  // The CSV input data might be missing certain fields. We force the 
nullability
+  // of the user-provided schema to avoid data corruptions.
+  val nullableSchema = schema.asNullable
+
+  // Used in `FunctionRegistry`
+  def this(child: Expression, schema: Expression, options: Map[String, 
String]) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = options,
+  child = child,
+  timeZoneId = None)
+
+  def this(child: Expression, schema: Expression) = this(child, schema, 
Map.empty[String, String])
+
+  def this(child: Expression, schema: Expression, options: Expression) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = ExprUtils.convertToMapData(options),
+  child = child,
+  timeZoneId = None)
+
+  // This converts parsed rows to the desired output by the given schema.
+  @transient
+  lazy val converter = (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next() else null
+
+  @transient lazy val parser = {
+val parsedOptions = new CSVOptions(options, true, timeZoneId.get)
+val mode = parsedOptions.parseMode
+if (mode != PermissiveMode && mode != FailFastMode) {
+  throw new AnalysisException(s"from_csv() doesn't support the 
${mode.name} mode. " +
+s"Acceptable modes are ${PermissiveMode.name} and 
${FailFastMode.name}.")
+}
+val actualSchema =
+  StructType(nullableSchema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord))
--- End diff --

Could we have a test case to ensure it works?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r222776820
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.csv._
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Converts a CSV input string to a [[StructType]] with the specified 
schema.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with 
the given `csvStr` and `schema`.",
+  examples = """
+Examples:
+  > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE');
+   {"a":1, "b":0.8}
+  """,
+  since = "3.0.0")
+// scalastyle:on line.size.limit
+case class CsvToStructs(
+schema: StructType,
+options: Map[String, String],
+child: Expression,
+timeZoneId: Option[String] = None)
+  extends UnaryExpression with TimeZoneAwareExpression with 
CodegenFallback with ExpectsInputTypes {
+
+  override def nullable: Boolean = true
+
+  // The CSV input data might be missing certain fields. We force the 
nullability
+  // of the user-provided schema to avoid data corruptions.
+  val nullableSchema = schema.asNullable
+
+  // Used in `FunctionRegistry`
+  def this(child: Expression, schema: Expression, options: Map[String, 
String]) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = options,
+  child = child,
+  timeZoneId = None)
+
+  def this(child: Expression, schema: Expression) = this(child, schema, 
Map.empty[String, String])
+
+  def this(child: Expression, schema: Expression, options: Expression) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = ExprUtils.convertToMapData(options),
+  child = child,
+  timeZoneId = None)
+
+  // This converts parsed rows to the desired output by the given schema.
+  @transient
+  lazy val converter = (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next() else null
--- End diff --

why not inlining this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r222773963
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.csv._
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Converts a CSV input string to a [[StructType]] with the specified 
schema.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with 
the given `csvStr` and `schema`.",
+  examples = """
+Examples:
+  > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE');
+   {"a":1, "b":0.8}
+  """,
+  since = "3.0.0")
+// scalastyle:on line.size.limit
+case class CsvToStructs(
+schema: StructType,
+options: Map[String, String],
+child: Expression,
+timeZoneId: Option[String] = None)
+  extends UnaryExpression with TimeZoneAwareExpression with 
CodegenFallback with ExpectsInputTypes {
+
+  override def nullable: Boolean = true
+
+  // The CSV input data might be missing certain fields. We force the 
nullability
+  // of the user-provided schema to avoid data corruptions.
+  val nullableSchema = schema.asNullable
+
+  // Used in `FunctionRegistry`
+  def this(child: Expression, schema: Expression, options: Map[String, 
String]) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = options,
+  child = child,
+  timeZoneId = None)
+
+  def this(child: Expression, schema: Expression) = this(child, schema, 
Map.empty[String, String])
+
+  def this(child: Expression, schema: Expression, options: Expression) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = ExprUtils.convertToMapData(options),
+  child = child,
+  timeZoneId = None)
+
+  // This converts parsed rows to the desired output by the given schema.
+  @transient
+  lazy val converter = (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next() else null
+
+  @transient lazy val parser = {
+val parsedOptions = new CSVOptions(options, true, timeZoneId.get)
--- End diff --

named boolean expression. `columnPruning = true`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r222774540
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.csv._
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Converts a CSV input string to a [[StructType]] with the specified 
schema.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with 
the given `csvStr` and `schema`.",
+  examples = """
+Examples:
+  > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE');
+   {"a":1, "b":0.8}
+  """,
+  since = "3.0.0")
+// scalastyle:on line.size.limit
+case class CsvToStructs(
+schema: StructType,
+options: Map[String, String],
+child: Expression,
+timeZoneId: Option[String] = None)
+  extends UnaryExpression with TimeZoneAwareExpression with 
CodegenFallback with ExpectsInputTypes {
+
+  override def nullable: Boolean = true
+
+  // The CSV input data might be missing certain fields. We force the 
nullability
+  // of the user-provided schema to avoid data corruptions.
+  val nullableSchema = schema.asNullable
+
+  // Used in `FunctionRegistry`
+  def this(child: Expression, schema: Expression, options: Map[String, 
String]) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = options,
+  child = child,
+  timeZoneId = None)
+
+  def this(child: Expression, schema: Expression) = this(child, schema, 
Map.empty[String, String])
+
+  def this(child: Expression, schema: Expression, options: Expression) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = ExprUtils.convertToMapData(options),
+  child = child,
+  timeZoneId = None)
+
+  // This converts parsed rows to the desired output by the given schema.
+  @transient
+  lazy val converter = (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next() else null
+
+  @transient lazy val parser = {
+val parsedOptions = new CSVOptions(options, true, timeZoneId.get)
+val mode = parsedOptions.parseMode
+if (mode != PermissiveMode && mode != FailFastMode) {
+  throw new AnalysisException(s"from_csv() doesn't support the 
${mode.name} mode. " +
+s"Acceptable modes are ${PermissiveMode.name} and 
${FailFastMode.name}.")
+}
+val actualSchema =
+  StructType(nullableSchema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord))
+val rawParser = new UnivocityParser(actualSchema, actualSchema, 
parsedOptions)
+new FailureSafeParser[String](
+  input => Seq(rawParser.parse(input)),
+  mode,
+  nullableSchema,
+  parsedOptions.columnNameOfCorruptRecord,
+  parsedOptions.multiLine)
+  }
+
+  override def dataType: DataType = nullableSchema
+
+  override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+copy(timeZoneId = Option(timeZoneId))
--- End diff --

```
   override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = 
{
 copy(timeZoneId = Option(timeZoneId))
   }
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r222774199
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.csv._
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Converts a CSV input string to a [[StructType]] with the specified 
schema.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with 
the given `csvStr` and `schema`.",
+  examples = """
+Examples:
+  > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE');
+   {"a":1, "b":0.8}
+  """,
+  since = "3.0.0")
+// scalastyle:on line.size.limit
+case class CsvToStructs(
+schema: StructType,
+options: Map[String, String],
+child: Expression,
+timeZoneId: Option[String] = None)
+  extends UnaryExpression with TimeZoneAwareExpression with 
CodegenFallback with ExpectsInputTypes {
+
+  override def nullable: Boolean = true
+
+  // The CSV input data might be missing certain fields. We force the 
nullability
+  // of the user-provided schema to avoid data corruptions.
+  val nullableSchema = schema.asNullable
+
+  // Used in `FunctionRegistry`
+  def this(child: Expression, schema: Expression, options: Map[String, 
String]) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = options,
+  child = child,
+  timeZoneId = None)
+
+  def this(child: Expression, schema: Expression) = this(child, schema, 
Map.empty[String, String])
+
+  def this(child: Expression, schema: Expression, options: Expression) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = ExprUtils.convertToMapData(options),
+  child = child,
+  timeZoneId = None)
+
+  // This converts parsed rows to the desired output by the given schema.
+  @transient
+  lazy val converter = (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next() else null
+
+  @transient lazy val parser = {
+val parsedOptions = new CSVOptions(options, true, timeZoneId.get)
+val mode = parsedOptions.parseMode
+if (mode != PermissiveMode && mode != FailFastMode) {
+  throw new AnalysisException(s"from_csv() doesn't support the 
${mode.name} mode. " +
--- End diff --

test case? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r222772770
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala
 ---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.Calendar
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.plans.PlanTestBase
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper 
with PlanTestBase {
+  val badCsv = "\u\u\uA\u0001AAA"
+
+  val gmtId = Option(DateTimeUtils.TimeZoneGMT.getID)
+
+  test("from_csv") {
+val csvData = "1"
+val schema = StructType(StructField("a", IntegerType) :: Nil)
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal(csvData), gmtId),
+  InternalRow(1)
+)
+  }
+
+  test("from_csv - invalid data") {
+val csvData = "---"
+val schema = StructType(StructField("a", DoubleType) :: Nil)
+checkEvaluation(
+  CsvToStructs(schema, Map("mode" -> PermissiveMode.name), 
Literal(csvData), gmtId),
+  InternalRow(null))
+
+// Default mode is Permissive
+checkEvaluation(CsvToStructs(schema, Map.empty, Literal(csvData), 
gmtId), InternalRow(null))
+  }
+
+  test("from_csv null input column") {
+val schema = StructType(StructField("a", IntegerType) :: Nil)
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal.create(null, StringType), 
gmtId),
+  null
+)
+  }
+
+  test("from_csv bad UTF-8") {
+val schema = StructType(StructField("a", IntegerType) :: Nil)
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal(badCsv), gmtId),
+  InternalRow(null))
+  }
+
+  test("from_csv with timestamp") {
+val schema = StructType(StructField("t", TimestampType) :: Nil)
+
+val csvData1 = "2016-01-01T00:00:00.123Z"
+var c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT)
+c.set(2016, 0, 1, 0, 0, 0)
+c.set(Calendar.MILLISECOND, 123)
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal(csvData1), gmtId),
+  InternalRow(c.getTimeInMillis * 1000L)
+)
+// The result doesn't change because the CSV string includes timezone 
string ("Z" here),
+// which means the string represents the timestamp string in the 
timezone regardless of
+// the timeZoneId parameter.
+checkEvaluation(
+  CsvToStructs(schema, Map.empty, Literal(csvData1), Option("PST")),
+  InternalRow(c.getTimeInMillis * 1000L)
+)
+
+val csvData2 = "2016-01-01T00:00:00"
+for (tz <- DateTimeTestUtils.ALL_TIMEZONES) {
--- End diff --

how long does this test case take?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r222771917
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.csv._
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Converts a CSV input string to a [[StructType]] with the specified 
schema.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with 
the given `csvStr` and `schema`.",
+  examples = """
+Examples:
+  > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE');
+   {"a":1, "b":0.8}
--- End diff --

Also add an example with options.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-30 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r221449056
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 ---
@@ -520,7 +520,10 @@ object FunctionRegistry {
 castAlias("date", DateType),
 castAlias("timestamp", TimestampType),
 castAlias("binary", BinaryType),
-castAlias("string", StringType)
+castAlias("string", StringType),
+
+// csv
+expression[CsvToStructs]("from_csv")
--- End diff --

This sounds a reasonable change. cc @rxin 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-29 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r221424336
  
--- Diff: R/pkg/R/functions.R ---
@@ -2203,6 +2209,23 @@ setMethod("from_json", signature(x = "Column", 
schema = "characterOrstructType")
 column(jc)
   })
 
+#' @details
+#' \code{from_csv}: Parses a column containing a CSV string into a Column 
of \code{structType}
+#' with the specified \code{schema}.
+#' If the string is unparseable, the Column will contain the value NA.
+#'
+#' @rdname column_collection_functions
+#' @aliases from_csv from_csv,Column,character-method
+#' @note from_csv since 2.5.0
--- End diff --

added


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-28 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r221415711
  
--- Diff: R/pkg/R/functions.R ---
@@ -2203,6 +2209,23 @@ setMethod("from_json", signature(x = "Column", 
schema = "characterOrstructType")
 column(jc)
   })
 
+#' @details
+#' \code{from_csv}: Parses a column containing a CSV string into a Column 
of \code{structType}
+#' with the specified \code{schema}.
+#' If the string is unparseable, the Column will contain the value NA.
+#'
+#' @rdname column_collection_functions
+#' @aliases from_csv from_csv,Column,character-method
+#' @note from_csv since 2.5.0
--- End diff --

consider adding example as in 
https://github.com/apache/spark/pull/22379/files#diff-d97f9adc2dcac0703568c799ff106987R2180?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-27 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r221047590
  
--- Diff: R/pkg/R/functions.R ---
@@ -2204,6 +2210,26 @@ setMethod("from_json", signature(x = "Column", 
schema = "characterOrstructType")
   })
 
 #' @details
+#' \code{from_csv}: Parses a column containing a CSV string into a Column 
of \code{structType}
+#' with the specified \code{schema}.
+#' If the string is unparseable, the Column will contain the value NA.
+#'
+#' @rdname column_collection_functions
+#' @aliases from_csv from_csv,Column,character-method
+#' @note from_csv since 2.5.0
+setMethod("from_csv", signature(x = "Column", schema = "character"),
+  function(x, schema, ...) {
+options <- varargsToStrEnv(...)
+jc <- callJStatic("org.apache.spark.sql.functions",
+  "from_csv",
+  x@jc, schema, options)
+column(jc)
+  })
+
+#' @details
+#' \code{from_utc_timestamp}: Given a timestamp like '2017-07-14 
02:40:00.0', interprets it as a
+#' time in UTC, and renders that time as a timestamp in the given time 
zone. For example, 'GMT+1'
+#' would yield '2017-07-14 03:40:00.0'.
--- End diff --

Please remove the above three lines which were removed via 
https://github.com/apache/spark/commit/ff876137faba1802b66ecd483ba15f6ccd83ffc5#diff-d97f9adc2dcac0703568c799ff106987L2207
 by @cloud-fan .


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-25 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r220409483
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 ---
@@ -520,7 +520,10 @@ object FunctionRegistry {
 castAlias("date", DateType),
 castAlias("timestamp", TimestampType),
 castAlias("binary", BinaryType),
-castAlias("string", StringType)
+castAlias("string", StringType),
+
+// csv
+expression[CsvToStructs]("from_csv")
--- End diff --

Yea, it looks so. That's actually the concern in 
https://github.com/apache/spark/pull/22379#discussion_r216443482


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r220408893
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 ---
@@ -520,7 +520,10 @@ object FunctionRegistry {
 castAlias("date", DateType),
 castAlias("timestamp", TimestampType),
 castAlias("binary", BinaryType),
-castAlias("string", StringType)
+castAlias("string", StringType),
+
+// csv
+expression[CsvToStructs]("from_csv")
--- End diff --

So the only reason to move CSV functionality to catalyst is to register it 
as built-in functuion?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-25 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r220346397
  
--- Diff: R/pkg/NAMESPACE ---
@@ -275,6 +275,7 @@ exportMethods("%<=>%",
   "format_number",
   "format_string",
   "from_json",
+  "from_csv",
--- End diff --

sorted


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-25 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r220345834
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
+
+class CsvFunctionsSuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  val noOptions = Map[String, String]()
+
+  test("from_csv") {
+val df = Seq("1").toDS()
+val schema = new StructType().add("a", IntegerType)
+
+checkAnswer(
+  df.select(from_csv($"value", schema, noOptions)),
+  Row(Row(1)) :: Nil)
+  }
+
+  test("from_csv with option") {
+val df = Seq("26/08/2015 18:00").toDS()
+val schema = new StructType().add("time", TimestampType)
+val options = Map("timestampFormat" -> "dd/MM/ HH:mm")
+
+checkAnswer(
+  df.select(from_csv($"value", schema, options)),
+  Row(Row(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0"
+  }
+
+  test("from_csv missing columns") {
--- End diff --

I moved most of the tests for `csvFunctionsSuite` to `csvExpressionsSuite`. 
I leaved 2 tests for testing both signatures of the `from_csv` method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-25 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r220345504
  
--- Diff: sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql ---
@@ -0,0 +1,12 @@
+-- from_csv
+describe function from_csv;
+describe function extended from_csv;
+select from_csv('1', 'a INT');
--- End diff --

I removed it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-25 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r220345402
  
--- Diff: sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql ---
@@ -0,0 +1,12 @@
+-- from_csv
+describe function from_csv;
+describe function extended from_csv;
--- End diff --

I moved most of the tests for `csvFunctionsSuite` to `csvExpressionsSuite`. 
I leaved 2 tests for testing both signatures of the `from_csv` method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-24 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r219947088
  
--- Diff: sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql ---
@@ -0,0 +1,12 @@
+-- from_csv
+describe function from_csv;
+describe function extended from_csv;
--- End diff --

Do you mean both `describe` or only the last one? I would actually leave it 
to keep consistency with `from_json` tests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-24 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r219945469
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
+
+class CsvFunctionsSuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  val noOptions = Map[String, String]()
+
+  test("from_csv") {
+val df = Seq("1").toDS()
+val schema = new StructType().add("a", IntegerType)
+
+checkAnswer(
+  df.select(from_csv($"value", schema, noOptions)),
+  Row(Row(1)) :: Nil)
+  }
+
+  test("from_csv with option") {
+val df = Seq("26/08/2015 18:00").toDS()
+val schema = new StructType().add("time", TimestampType)
+val options = Map("timestampFormat" -> "dd/MM/ HH:mm")
+
+checkAnswer(
+  df.select(from_csv($"value", schema, options)),
+  Row(Row(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0"
+  }
+
+  test("from_csv missing columns") {
--- End diff --

I just looked at tests for `from_json` and ported relevant tests. I thought 
it would be convenient having the same structure of tests. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-24 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r219815230
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
+
+class CsvFunctionsSuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  val noOptions = Map[String, String]()
+
+  test("from_csv") {
+val df = Seq("1").toDS()
+val schema = new StructType().add("a", IntegerType)
+
+checkAnswer(
+  df.select(from_csv($"value", schema, noOptions)),
+  Row(Row(1)) :: Nil)
+  }
+
+  test("from_csv with option") {
+val df = Seq("26/08/2015 18:00").toDS()
+val schema = new StructType().add("time", TimestampType)
+val options = Map("timestampFormat" -> "dd/MM/ HH:mm")
+
+checkAnswer(
+  df.select(from_csv($"value", schema, options)),
+  Row(Row(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0"
+  }
+
+  test("from_csv missing columns") {
--- End diff --

@MaxGekk, I think we just need to test few set of end-to-end tests and put 
other tests into `CsvExpressionsSuite.scala`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-24 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r219814691
  
--- Diff: sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql ---
@@ -0,0 +1,12 @@
+-- from_csv
+describe function from_csv;
+describe function extended from_csv;
--- End diff --

Shall we not test this? It requires to fix the tests accordingly everytime 
we fix the doc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-24 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r219814329
  
--- Diff: sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql ---
@@ -0,0 +1,12 @@
+-- from_csv
+describe function from_csv;
+describe function extended from_csv;
+select from_csv('1', 'a INT');
--- End diff --

I think we don't need this test case since the below one tests it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-24 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r219813519
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2637,6 +2637,29 @@ def sequence(start, stop, step=None):
 _to_java_column(start), _to_java_column(stop), 
_to_java_column(step)))
 
 
+@ignore_unicode_prefix
+@since(2.5)
+def from_csv(col, schema, options={}):
+"""
+Parses a column containing a CSV string into a :class:`StructType`
+with the specified schema. Returns `null`, in the case of an 
unparseable string.
+
+:param col: string column in CSV format
+:param schema: a string with schema in DDL format to use when parsing 
the CSV column.
+:param options: options to control parsing. accepts the same options 
as the CSV datasource
+
+>>> from pyspark.sql.types import *
--- End diff --

Looks we don't need this import.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-15 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r217901558
  
--- Diff: R/pkg/NAMESPACE ---
@@ -275,6 +275,7 @@ exportMethods("%<=>%",
   "format_number",
   "format_string",
   "from_json",
+  "from_csv",
--- End diff --

pleas sort this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-15 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r217901588
  
--- Diff: R/pkg/R/functions.R ---
@@ -2202,6 +2208,24 @@ setMethod("from_json", signature(x = "Column", 
schema = "characterOrstructType")
 column(jc)
   })
 
+#' @details
+#' \code{from_csv}: Parses a column containing a CSV string into a Column 
of \code{structType}
+#' with the specified \code{schema}.
+#' If the string is unparseable, the Column will contain the value NA.
+#'
+#' @rdname column_collection_functions
+#' @aliases from_csv from_csv,Column,character-method
+#'
--- End diff --

newline with `#'` is significant in ROxygen, please remove this line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-11 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r216875875
  
--- Diff: R/pkg/R/functions.R ---
@@ -3720,3 +3720,22 @@ setMethod("current_timestamp",
 jc <- callJStatic("org.apache.spark.sql.functions", 
"current_timestamp")
 column(jc)
   })
+
+#' @details
+#' \code{from_csv}: Parses a column containing a CSV string into a Column 
of \code{structType}
+#' with the specified \code{schema}.
+#' If the string is unparseable, the Column will contain the value NA.
+#'
+#' @rdname column_collection_functions
+#' @param schema a DDL-formatted string
+#' @aliases from_csv from_csv,Column,character-method
+#'
+#' @note from_csv since 3.0.0
+setMethod("from_csv", signature(x = "Column", schema = "character"),
+  function(x, schema, ...) {
--- End diff --

here 
https://github.com/apache/spark/blob/d2bfd9430f05d006accdecb6a62ed659fbd6a2f8/R/pkg/R/functions.R#L199


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-11 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r216875804
  
--- Diff: R/pkg/R/functions.R ---
@@ -3720,3 +3720,22 @@ setMethod("current_timestamp",
 jc <- callJStatic("org.apache.spark.sql.functions", 
"current_timestamp")
 column(jc)
   })
+
+#' @details
+#' \code{from_csv}: Parses a column containing a CSV string into a Column 
of \code{structType}
+#' with the specified \code{schema}.
+#' If the string is unparseable, the Column will contain the value NA.
+#'
+#' @rdname column_collection_functions
+#' @param schema a DDL-formatted string
+#' @aliases from_csv from_csv,Column,character-method
+#'
+#' @note from_csv since 3.0.0
+setMethod("from_csv", signature(x = "Column", schema = "character"),
+  function(x, schema, ...) {
--- End diff --

no no, this will break - I am referring to find the original doc `@rdname 
column_collection_functions` that has `...` already documented, and then add 
this in


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-11 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r216734651
  
--- Diff: R/pkg/R/functions.R ---
@@ -3720,3 +3720,22 @@ setMethod("current_timestamp",
 jc <- callJStatic("org.apache.spark.sql.functions", 
"current_timestamp")
 column(jc)
   })
+
+#' @details
+#' \code{from_csv}: Parses a column containing a CSV string into a Column 
of \code{structType}
+#' with the specified \code{schema}.
+#' If the string is unparseable, the Column will contain the value NA.
+#'
+#' @rdname column_collection_functions
+#' @param schema a DDL-formatted string
+#' @aliases from_csv from_csv,Column,character-method
+#'
+#' @note from_csv since 3.0.0
+setMethod("from_csv", signature(x = "Column", schema = "character"),
+  function(x, schema, ...) {
--- End diff --

@felixcheung I added a doc but when I run `run-tests.sh`, it outputs the 
warning:
```
* checking Rd \usage sections ... WARNING
Duplicated \argument entries in documentation object 
'column_collection_functions':
  ‘schema’

Functions with \usage entries need to have the appropriate \alias
entries, and all their arguments documented.
The \usage entries must correspond to syntactically valid R code.
See chapter ‘Writing R documentation files’ in the ‘Writing R
Extensions’ manual.
```
Is it ok? If it isn't, any ideas what can cause it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-10 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r216538924
  
--- Diff: R/pkg/R/functions.R ---
@@ -3720,3 +3720,22 @@ setMethod("current_timestamp",
 jc <- callJStatic("org.apache.spark.sql.functions", 
"current_timestamp")
 column(jc)
   })
+
+#' @details
+#' \code{from_csv}: Parses a column containing a CSV string into a Column 
of \code{structType}
+#' with the specified \code{schema}.
+#' If the string is unparseable, the Column will contain the value NA.
+#'
+#' @rdname column_collection_functions
+#' @param schema a DDL-formatted string
+#' @aliases from_csv from_csv,Column,character-method
+#'
+#' @note from_csv since 3.0.0
+setMethod("from_csv", signature(x = "Column", schema = "character"),
+  function(x, schema, ...) {
--- End diff --

can you add to the doc for `...` (in column_collection_functions) to 
indicate the use options for this function? if there is anything new?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r216510114
  
--- Diff: sql/catalyst/pom.xml ---
@@ -103,6 +103,12 @@
   commons-codec
   commons-codec
 
+
+  com.univocity
+  univocity-parsers
+  2.7.3
+  jar
+
--- End diff --

Ideally we should just make CSV one as a separate external module and this 
should be the right way given the discussion.

The current change wouldn't necessarily is blocked but I can see the point 
of moving the dependency makes further refactoring potentially harder as 
pointed out. Looks many people agreed upon separating them.

The concern here is,  it sounds we are stepping back from the ideal 
approach.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r216509108
  
--- Diff: R/pkg/R/functions.R ---
@@ -3720,3 +3720,22 @@ setMethod("current_timestamp",
 jc <- callJStatic("org.apache.spark.sql.functions", 
"current_timestamp")
 column(jc)
   })
+
+#' @details
+#' \code{from_csv}: Parses a column containing a CSV string into a Column 
of \code{structType}
+#' with the specified \code{schema}.
+#' If the string is unparseable, the Column will contain the value NA.
+#'
+#' @rdname column_collection_functions
+#' @param schema a DDL-formatted string
+#' @aliases from_csv from_csv,Column,character-method
+#'
+#' @note from_csv since 3.0.0
+setMethod("from_csv", signature(x = "Column", schema = "character"),
+  function(x, schema, ...) {
+options <- varargsToStrEnv(...)
+jc <- callJStatic("org.apache.spark.sql.functions",
+  "from_csv",
+  x@jc, schema, options)
+column(jc)
+  })
--- End diff --

newline at the end


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-10 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r216446691
  
--- Diff: sql/catalyst/pom.xml ---
@@ -103,6 +103,12 @@
   commons-codec
   commons-codec
 
+
+  com.univocity
+  univocity-parsers
+  2.7.3
+  jar
+
--- End diff --

I added the dependency only because I have to move `UnivocityParser` from 
`sql/core` to `sql/catalyst` because it is not accessible from `sql/core`. 
Please, tell me what is right approach here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-10 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r216443482
  
--- Diff: sql/catalyst/pom.xml ---
@@ -103,6 +103,12 @@
   commons-codec
   commons-codec
 
+
+  com.univocity
+  univocity-parsers
+  2.7.3
+  jar
+
--- End diff --

Hi, @MaxGekk , @gatorsmile , and @cloud-fan .

I know this is the same approach with `from_json`, but suddenly I'm 
wondering if this is the right evolution direction, `sql` -> `catalyst`. 
Recently, we made `avro` as a external module and [the development direction 
was the 
opposite](https://github.com/apache/spark/pull/21742#discussion_r201569743). If 
we put this into `catalyst`, it become harder to be separated from sql module.
> Ideally, we should separate parquet, orc and other built-in data sources 
from sql module.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-09-10 Thread MaxGekk
GitHub user MaxGekk opened a pull request:

https://github.com/apache/spark/pull/22379

[SPARK-25393][SQL] Adding new function from_csv()

## What changes were proposed in this pull request?

The PR adds new function `from_csv()` similar to `from_json()` to parse 
columns with CSV strings. I added the following methods:
```Scala
def from_csv(e: Column, schema: StructType, options: Map[String, String]): 
Column
```
and this signature to call it from Python, R and Java:
```Scala
def from_csv(e: Column, schema: String, options: java.util.Map[String, 
String]): Column
```

## How was this patch tested?

Added new test suites `CsvExpressionsSuite`, `CsvFunctionsSuite` and sql 
tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/MaxGekk/spark-1 from_csv

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22379.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22379


commit 344c2ab62095f91e161814230082f0d30c257365
Author: Maxim Gekk 
Date:   2018-09-09T13:07:34Z

Initial implementation of CsvToStruct

commit 5905019dd3fbe9dbba20505bf9df2b8ca62b4849
Author: Maxim Gekk 
Date:   2018-09-09T18:22:32Z

Added CSV Expression Test Suite

commit c5ac43273e70199467c28aaa40591042eac096aa
Author: Maxim Gekk 
Date:   2018-09-09T18:42:49Z

Register from_csv functions and add tests

commit bd2124cfcf00b8e049154dad9903b22b176347bb
Author: Maxim Gekk 
Date:   2018-09-09T19:07:00Z

Fix imports

commit b9bb081a3d655dbdece9d8a998853eda68171553
Author: Maxim Gekk 
Date:   2018-09-09T19:23:16Z

Adding SQL tests

commit 14ae619e2d6f18156fb24f0e391fb7ad11548ddd
Author: Maxim Gekk 
Date:   2018-09-09T20:18:33Z

from_csv for PySpark

commit cfb2ac3844ccc7a374b3c3ec58150264b545269f
Author: Maxim Gekk 
Date:   2018-09-10T08:47:27Z

from_csv for SparkR




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org