[GitHub] spark pull request #20929: [SPARK-23772][SQL][WIP] Provide an option to igno...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20929#discussion_r186593019 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/TypePlaceholder.scala --- @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.types + +/** + * An internal type that is a not yet available and will be replaced by an actual type later. + */ +case object TypePlaceholder extends StringType --- End diff -- In the latest fix, I used `NullType` for unresolved types and `FileStreamSource` would resolve the types when we see non-null values. As suggested in `SPARK-12436`, I think we might need to fallback into `StringType` just before `Sink`s, e.g., https://github.com/apache/spark/blob/76ecd095024a658bf68e5db658e4416565b30c17/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L492 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20929: [SPARK-23772][SQL][WIP] Provide an option to igno...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20929#discussion_r186589216 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/TypePlaceholder.scala --- @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.types + +/** + * An internal type that is a not yet available and will be replaced by an actual type later. + */ +case object TypePlaceholder extends StringType --- End diff -- I think `removeNullRecursively` seems to be a little time-consuming (json parsing twice) and why can't we do the same thing directly in `JacksonParser`? e.g., https://github.com/apache/spark/pull/20929/files#diff-635e02b2d1ce4ad1675b0350ccac0c10R334 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20929: [SPARK-23772][SQL][WIP] Provide an option to igno...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20929#discussion_r186584474 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/TypePlaceholder.scala --- @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.types + +/** + * An internal type that is a not yet available and will be replaced by an actual type later. + */ +case object TypePlaceholder extends StringType --- End diff -- In the first attempt, I used the new type instead of `NullType` because some `Sink`s (`FileStreamSink`) could not handle `NullType`; ``` // parquet java.lang.RuntimeException: Unsupported data type NullType. at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.org$apache$spark$sql$execution$datasources$parquet$ParquetWriteSupport$$makeWriter(ParquetWriteSupport.scala:206) at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$$anonfun$init$2.apply(ParquetWriteSupport.scala:93) at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$$anonfun$init$2.apply(ParquetWriteSupport.scala:93) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) // orc java.lang.IllegalArgumentException: Can't parse category at 'struct>' at org.apache.orc.TypeDescription.parseCategory(TypeDescription.java:223) at org.apache.orc.TypeDescription.parseType(TypeDescription.java:332) at org.apache.orc.TypeDescription.parseStruct(TypeDescription.java:327) at org.apache.orc.TypeDescription.parseType(TypeDescription.java:385) at org.apache.orc.TypeDescription.fromString(TypeDescription.java:406) // csv java.lang.UnsupportedOperationException: CSV data source does not support null data type. at org.apache.spark.sql.execution.datasources.csv.CSVUtils$.org$apache$spark$sql$execution$datasources$csv$CSVUtils$$verifyType$1(CSVUtils.scala:130) at org.apache.spark.sql.execution.datasources.csv.CSVUtils$$anonfun$verifySchema$1.apply(CSVUtils.scala:134) at org.apache.spark.sql.execution.datasources.csv.CSVUtils$$anonfun$verifySchema$1.apply(CSVUtils.scala:134) at scala.collection.Iterator$class.foreach(Iterator.scala:893) ``` So, in the previous fix, I tried to add `PlaceholderType` inherited from `StringType` and this type could be correctly handled in all the `Sink`, but too tricky. In the suggested, `NullType, ArrayType(NullType), etc should be dropped` means that we need to handle an inferred schema as follows? e.g., ``` Inferred schema: "StructType" -> Schema used in FileStreamSource: "StructType" ``` Is this right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20929: [SPARK-23772][SQL][WIP] Provide an option to igno...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20929#discussion_r186581349 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -887,6 +887,14 @@ object SQLConf { .booleanConf .createWithDefault(false) + val IGNORE_NULL_FIELDS_STREAMING_SCHEMA_INFERENCE = +buildConf("spark.sql.streaming.schemaInference.ignoreNullFields") --- End diff -- ok, I used `dropFieldIfAllNull`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20929: [SPARK-23772][SQL][WIP] Provide an option to igno...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20929#discussion_r186581071 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -887,6 +887,14 @@ object SQLConf { .booleanConf .createWithDefault(false) + val IGNORE_NULL_FIELDS_STREAMING_SCHEMA_INFERENCE = --- End diff -- ok, I dropped the conf in `SQLConf`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20929: [SPARK-23772][SQL][WIP] Provide an option to igno...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/20929#discussion_r180291983 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -887,6 +887,14 @@ object SQLConf { .booleanConf .createWithDefault(false) + val IGNORE_NULL_FIELDS_STREAMING_SCHEMA_INFERENCE = +buildConf("spark.sql.streaming.schemaInference.ignoreNullFields") --- End diff -- `ignoreNullFields` is not precise. For example ```json {"a": 1} {"a": null} ``` The `null` value is not ignored. I would suggest `dropFieldIfAllNull`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20929: [SPARK-23772][SQL][WIP] Provide an option to igno...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/20929#discussion_r180293096 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala --- @@ -624,6 +624,42 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("SPARK-23772 Ignore column of all null values or empty array during JSON schema inference") { --- End diff -- We need more test coverage. I have a similar internal implementation that tests the following cases (ignore the actual test, just look at the example records): ```scala test("null") { assert(removeNullRecursively("null") === "null") } test("empty string") { assert(removeNullRecursively("\"\"") === "\"\"") } test("empty object") { assert(removeNullRecursively("{}") === "null") } test("object with all null values") { val json = """{"a":null,"b":null, "c":null}""" assert(removeNullRecursively(json) === "null") } test("object with some null fields") { val json = """{"a":null,"b":"c","d":null,"e":"f"}""" val expected = """{"b":"c","e":"f"}""" assert(removeNullRecursively(json) === expected) } test("object with some nested null values") { val json = """{"a":{},"b":{"c":null},"d":{"c":"e"},"f":{"c":null,"g":"h"}}""" val expected = """{"d":{"c":"e"},"f":{"g":"h"}}""" assert(removeNullRecursively(json) === expected) } test("array with all null elements") { val json = """[null,null,{},{"a":null}]""" val expected = "null" assert(removeNullRecursively(json) === expected) } test("array with some null elements") { // TODO: is it an issue if we covert empty object to null in an array? val json = """[null,"a",null,{},"b"]""" val expected = """[null,"a",null,null,"b"]""" assert(removeNullRecursively(json) === expected) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20929: [SPARK-23772][SQL][WIP] Provide an option to igno...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/20929#discussion_r180291486 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/TypePlaceholder.scala --- @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.types + +/** + * An internal type that is a not yet available and will be replaced by an actual type later. + */ +case object TypePlaceholder extends StringType --- End diff -- Is it necessary to introduce a new DataType? Would it be the same if we use `NullType`? With the flag on, at the end of schema inference, `NullType`, `ArrayType(NullType)`, etc should be dropped instead of using StringType as fallback. Basically, during schema inference, we keep the one that reveals more details, for example: ``` (NullType, ArrayType(NullType)) => ArrayType(NullType) (ArrayType(NullType), ArrayType(StructType(Field("a", NullType => ArrayType(StructType(Field("a", NullType ``` At the end, we implement a util method that determine whether a field is all null and drop them if true. It should be done recursively. I have an internal implementation that implements a similar logic, but on the JSON record itself. You might want to apply it to data types. ```scala /** * Removes null fields recursively from the input JSON record. * An array is null if all its elements are null. * An object is null if all its values are null. */ def removeNullRecursively(jsonStr: String): String = { val json = parse(jsonStr) val cleaned = doRemoveNullRecursively(json) compact(render(cleaned)) // should handle null correctly } private def doRemoveNullRecursively(value: JValue): JValue = { value match { case null => null case JNull => null case JArray(values) => val cleaned = values.map(doRemoveNullRecursively) if (cleaned.exists(_ != null)) { JArray(cleaned) } else { null } case JObject(pairs) => val cleaned = pairs.flatMap { case (k, v) => val cv = doRemoveNullRecursively(v) if (cv != null) { Some((k, cv)) } else { None } } if (cleaned.nonEmpty) { JObject(cleaned) } else { null } // all other types are non-null case _ => value } } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20929: [SPARK-23772][SQL][WIP] Provide an option to igno...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/20929#discussion_r180291939 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -887,6 +887,14 @@ object SQLConf { .booleanConf .createWithDefault(false) + val IGNORE_NULL_FIELDS_STREAMING_SCHEMA_INFERENCE = --- End diff -- * I was expecting a configuration for the JSON data source instead of a Spark SQL conf. Say, how to implement this feature if the data source is outside Spark? * Since Spark SQL cannot control the behavior of external data source implementations, the flag won't work unless external data sources recognize it. This is another reason to put it under JSON data source. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20929: [SPARK-23772][SQL][WIP] Provide an option to igno...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20929#discussion_r178415303 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/TypePlaceholder.scala --- @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.types + +/** + * An internal type that is a not yet available and will be replaced by an actual type later. + */ +case object TypePlaceholder extends StringType --- End diff -- I just added this weird type for this prototype, so we need to consider more here. I don't have better idea now, so I'd like to have any suggestion about this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20929: [SPARK-23772][SQL][WIP] Provide an option to igno...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20929#discussion_r177925982 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala --- @@ -624,6 +624,42 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("SPARK-23772 Ignore column of all null values or empty array during JSON schema inference") { --- End diff -- @mengxr This test matches your intention described in the jira? (I just want to confirm this before I brush up the code). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20929: [SPARK-23772][SQL][WIP] Provide an option to igno...
GitHub user maropu opened a pull request: https://github.com/apache/spark/pull/20929 [SPARK-23772][SQL][WIP] Provide an option to ignore column of all null values or empty array during JSON schema inference ## What changes were proposed in this pull request? This pr added a SQL option `spark.sql.streaming.schemaInference.ignoreNullFields` to ignore column of all null values or empty array during JSON schema inference. When non-null values appear in subsequent batches, its type is resolved by using the values. ## How was this patch tested? Added tests in `FileStreamSourceSuite`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/maropu/spark SPARK-23772 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20929.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20929 commit 876da84a7da9dbdc408e153b9e3dc17776a0c9db Author: Takeshi Yamamuro Date: 2018-03-28T14:00:09Z WIP --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org