[GitHub] spark pull request #20929: [SPARK-23772][SQL][WIP] Provide an option to igno...

2018-05-07 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20929#discussion_r186593019
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/types/TypePlaceholder.scala ---
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+/**
+ * An internal type that is a not yet available and will be replaced by an 
actual type later.
+ */
+case object TypePlaceholder extends StringType
--- End diff --

In the latest fix, I used `NullType` for unresolved types and 
`FileStreamSource` would resolve the types when we see non-null values. As 
suggested in `SPARK-12436`, I think we might need to fallback into `StringType` 
just before `Sink`s, e.g., 
https://github.com/apache/spark/blob/76ecd095024a658bf68e5db658e4416565b30c17/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L492


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20929: [SPARK-23772][SQL][WIP] Provide an option to igno...

2018-05-07 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20929#discussion_r186589216
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/types/TypePlaceholder.scala ---
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+/**
+ * An internal type that is a not yet available and will be replaced by an 
actual type later.
+ */
+case object TypePlaceholder extends StringType
--- End diff --

I think `removeNullRecursively` seems to be a little time-consuming (json 
parsing twice) and why can't we do the same thing directly in `JacksonParser`? 
e.g.,

https://github.com/apache/spark/pull/20929/files#diff-635e02b2d1ce4ad1675b0350ccac0c10R334


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20929: [SPARK-23772][SQL][WIP] Provide an option to igno...

2018-05-07 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20929#discussion_r186584474
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/types/TypePlaceholder.scala ---
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+/**
+ * An internal type that is a not yet available and will be replaced by an 
actual type later.
+ */
+case object TypePlaceholder extends StringType
--- End diff --

In the first attempt, I used the new type instead of `NullType` because 
some `Sink`s (`FileStreamSink`) could not handle `NullType`;
```
// parquet
java.lang.RuntimeException: Unsupported data type NullType.
at scala.sys.package$.error(package.scala:27)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.org$apache$spark$sql$execution$datasources$parquet$ParquetWriteSupport$$makeWriter(ParquetWriteSupport.scala:206)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$$anonfun$init$2.apply(ParquetWriteSupport.scala:93)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$$anonfun$init$2.apply(ParquetWriteSupport.scala:93)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
// orc
java.lang.IllegalArgumentException: Can't parse category at 
'struct>'
at 
org.apache.orc.TypeDescription.parseCategory(TypeDescription.java:223)
at 
org.apache.orc.TypeDescription.parseType(TypeDescription.java:332)
at 
org.apache.orc.TypeDescription.parseStruct(TypeDescription.java:327)
at 
org.apache.orc.TypeDescription.parseType(TypeDescription.java:385)
at 
org.apache.orc.TypeDescription.fromString(TypeDescription.java:406)

// csv
java.lang.UnsupportedOperationException: CSV data source does not support 
null data type.
at 
org.apache.spark.sql.execution.datasources.csv.CSVUtils$.org$apache$spark$sql$execution$datasources$csv$CSVUtils$$verifyType$1(CSVUtils.scala:130)
at 
org.apache.spark.sql.execution.datasources.csv.CSVUtils$$anonfun$verifySchema$1.apply(CSVUtils.scala:134)
at 
org.apache.spark.sql.execution.datasources.csv.CSVUtils$$anonfun$verifySchema$1.apply(CSVUtils.scala:134)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)

```
So, in the previous fix, I tried to add `PlaceholderType` inherited from 
`StringType` and this type could be correctly handled in all the `Sink`, but 
too tricky. 

In the suggested, `NullType, ArrayType(NullType), etc should be dropped` 
means that we need to handle an inferred schema as follows? e.g.,
```
Inferred schema: "StructType" 
-> Schema used in FileStreamSource: "StructType"
```
Is this right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20929: [SPARK-23772][SQL][WIP] Provide an option to igno...

2018-05-07 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20929#discussion_r186581349
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -887,6 +887,14 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
+  val IGNORE_NULL_FIELDS_STREAMING_SCHEMA_INFERENCE =
+buildConf("spark.sql.streaming.schemaInference.ignoreNullFields")
--- End diff --

ok, I used `dropFieldIfAllNull`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20929: [SPARK-23772][SQL][WIP] Provide an option to igno...

2018-05-07 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20929#discussion_r186581071
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -887,6 +887,14 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
+  val IGNORE_NULL_FIELDS_STREAMING_SCHEMA_INFERENCE =
--- End diff --

ok, I dropped the conf in `SQLConf`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20929: [SPARK-23772][SQL][WIP] Provide an option to igno...

2018-04-09 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20929#discussion_r180291983
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -887,6 +887,14 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
+  val IGNORE_NULL_FIELDS_STREAMING_SCHEMA_INFERENCE =
+buildConf("spark.sql.streaming.schemaInference.ignoreNullFields")
--- End diff --

`ignoreNullFields` is not precise. For example

```json
{"a": 1}
{"a": null}
```

The `null` value is not ignored. I would suggest `dropFieldIfAllNull`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20929: [SPARK-23772][SQL][WIP] Provide an option to igno...

2018-04-09 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20929#discussion_r180293096
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -624,6 +624,42 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("SPARK-23772 Ignore column of all null values or empty array during 
JSON schema inference") {
--- End diff --

We need more test coverage. I have a similar internal implementation that 
tests the following cases (ignore the actual test, just look at the example 
records):

```scala
  test("null") {
assert(removeNullRecursively("null") === "null")
  }

  test("empty string") {
assert(removeNullRecursively("\"\"") === "\"\"")
  }

  test("empty object") {
assert(removeNullRecursively("{}") === "null")
  }

  test("object with all null values") {
val json = """{"a":null,"b":null, "c":null}"""
assert(removeNullRecursively(json) === "null")
  }

  test("object with some null fields") {
val json = """{"a":null,"b":"c","d":null,"e":"f"}"""
val expected = """{"b":"c","e":"f"}"""
assert(removeNullRecursively(json) === expected)
  }

  test("object with some nested null values") {
val json = 
"""{"a":{},"b":{"c":null},"d":{"c":"e"},"f":{"c":null,"g":"h"}}"""
val expected = """{"d":{"c":"e"},"f":{"g":"h"}}"""
assert(removeNullRecursively(json) === expected)
  }

  test("array with all null elements") {
val json = """[null,null,{},{"a":null}]"""
val expected = "null"
assert(removeNullRecursively(json) === expected)
  }

  test("array with some null elements") {
// TODO: is it an issue if we covert empty object to null in an array?
val json = """[null,"a",null,{},"b"]"""
val expected = """[null,"a",null,null,"b"]"""
assert(removeNullRecursively(json) === expected)
  }
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20929: [SPARK-23772][SQL][WIP] Provide an option to igno...

2018-04-09 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20929#discussion_r180291486
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/types/TypePlaceholder.scala ---
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+/**
+ * An internal type that is a not yet available and will be replaced by an 
actual type later.
+ */
+case object TypePlaceholder extends StringType
--- End diff --

Is it necessary to introduce a new DataType? Would it be the same if we use 
`NullType`? With the flag on, at the end of schema inference, `NullType`, 
`ArrayType(NullType)`, etc should be dropped instead of using StringType as 
fallback. Basically, during schema inference, we keep the one that reveals more 
details, for example:

```
(NullType, ArrayType(NullType)) => ArrayType(NullType)
(ArrayType(NullType), ArrayType(StructType(Field("a", NullType => 
ArrayType(StructType(Field("a", NullType
```

At the end, we implement a util method that determine whether a field is 
all null and drop them if true. It should be done recursively. I have an 
internal implementation that implements a similar logic, but on the JSON record 
itself. You might want to apply it to data types.

```scala
  /**
   * Removes null fields recursively from the input JSON record.
   * An array is null if all its elements are null.
   * An object is null if all its values are null.
   */
  def removeNullRecursively(jsonStr: String): String = {
val json = parse(jsonStr)
val cleaned = doRemoveNullRecursively(json)
compact(render(cleaned)) // should handle null correctly
  }

  private def doRemoveNullRecursively(value: JValue): JValue = {
value match {
  case null =>
null

  case JNull =>
null

  case JArray(values) =>
val cleaned = values.map(doRemoveNullRecursively)
if (cleaned.exists(_ != null)) {
  JArray(cleaned)
} else {
  null
}

  case JObject(pairs) =>
val cleaned = pairs.flatMap { case (k, v) =>
  val cv = doRemoveNullRecursively(v)
  if (cv != null) {
Some((k, cv))
  } else {
None
  }
}
if (cleaned.nonEmpty) {
  JObject(cleaned)
} else {
  null
}

  // all other types are non-null
  case _ =>
value
}
  }
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20929: [SPARK-23772][SQL][WIP] Provide an option to igno...

2018-04-09 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20929#discussion_r180291939
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -887,6 +887,14 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
+  val IGNORE_NULL_FIELDS_STREAMING_SCHEMA_INFERENCE =
--- End diff --

* I was expecting a configuration for the JSON data source instead of a 
Spark SQL conf. Say, how to implement this feature if the data source is 
outside Spark?
* Since Spark SQL cannot control the behavior of external data source 
implementations, the flag won't work unless external data sources recognize it. 
This is another reason to put it under JSON data source.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20929: [SPARK-23772][SQL][WIP] Provide an option to igno...

2018-03-30 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20929#discussion_r178415303
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/types/TypePlaceholder.scala ---
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+/**
+ * An internal type that is a not yet available and will be replaced by an 
actual type later.
+ */
+case object TypePlaceholder extends StringType
--- End diff --

I just added this weird type for this prototype, so we need to consider 
more here. I don't have better idea now, so I'd like to have any suggestion 
about this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20929: [SPARK-23772][SQL][WIP] Provide an option to igno...

2018-03-28 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20929#discussion_r177925982
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -624,6 +624,42 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("SPARK-23772 Ignore column of all null values or empty array during 
JSON schema inference") {
--- End diff --

@mengxr This test matches your intention described in the jira? (I just 
want to confirm this before I brush up the code).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20929: [SPARK-23772][SQL][WIP] Provide an option to igno...

2018-03-28 Thread maropu
GitHub user maropu opened a pull request:

https://github.com/apache/spark/pull/20929

[SPARK-23772][SQL][WIP] Provide an option to ignore column of all null 
values or empty array during JSON schema inference

## What changes were proposed in this pull request?
This pr added a SQL option 
`spark.sql.streaming.schemaInference.ignoreNullFields` to ignore column of all 
null values or empty array during JSON schema inference. When non-null values 
appear in subsequent batches, its type is resolved by using the values.

## How was this patch tested?
Added tests in `FileStreamSourceSuite`.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/maropu/spark SPARK-23772

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20929.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20929


commit 876da84a7da9dbdc408e153b9e3dc17776a0c9db
Author: Takeshi Yamamuro 
Date:   2018-03-28T14:00:09Z

WIP




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org