[Spark] How to find which type of key is illegal during from_json() function
*Component*: Spark *Level*: Advanced *Scenario*: How-to - *Problems Description* I have nested json string value in someone field of spark dataframe, and I would like to use from_json() to parse json object. Especially, if one of key type is not match with our defined struct type, it will return null. Based on this, could we find which key type is error? Related example follow as: *source dataframe:* | original_json_string | | -- | | "{a:{b:"dn", c:"test"}}" | ps. And we expected the value type of b should be double type. so we predefined struct type for from_json() to use, but just directly return null: *result dataframe after from_json:* | original_json_string | | -- | | null | In this sample, because value of a have 2 keys, b and c, could we know is value type of b is error instead of c, which can let me check data quickly instead just return null. If we would like to achieve this objective, how to implement it? if you have and ideas, I will be appreciated it, thank you. -- Best Regards, Mars Su *Phone*: 0988-661-013 *Email*: hueiyua...@gmail.com
Re: Corrupt record handling in spark structured streaming and from_json function
Dear spark user community, I have recieved some insight regarding filtering seperate dataframes in my spark-structured-streaming job. However I wish to write the dataframes aforementioned above in the stack overflow question each using a parquet writer to a separate location. My initial impression is this requires multiple sinks, but I'm being pressured against that. I think it might also be possible using the for each / for each batch writers. But I'm not sure regarding parquet writer, and also the caveats to this approach. Can some more advanced users or developers suggest how to go about this, particularly without using multiple streams? On Wed, Dec 26, 2018 at 6:01 PM Colin Williams wrote: > > https://stackoverflow.com/questions/53938967/writing-corrupt-data-from-kafka-json-datasource-in-spark-structured-streaming > > On Wed, Dec 26, 2018 at 2:42 PM Colin Williams > wrote: > > > > From my initial impression it looks like I'd need to create my own > > `from_json` using `jsonToStructs` as a reference but try to handle ` > > case : BadRecordException => null ` or similar to try to write the non > > matching string to a corrupt records column > > > > On Wed, Dec 26, 2018 at 1:55 PM Colin Williams > > wrote: > > > > > > Hi, > > > > > > I'm trying to figure out how I can write records that don't match a > > > json read schema via spark structred streaming to an output sink / > > > parquet location. Previously I did this in batch via corrupt column > > > features of batch. But in this spark structured streaming I'm reading > > > from kafka a string and using from_json on the value of that string. > > > If it doesn't match my schema then I from_json returns null for all > > > the rows, and does not populate a corrupt record column. But I want to > > > somehow obtain the source kafka string in a dataframe, and an write to > > > a output sink / parquet location. > > > > > > def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: > > > StructType) = { > > > val jsonDataFrame = > > > rawKafkaDataFrame.select(col("value").cast("string")) > > > jsonDataFrame.select(from_json(col("value"), > > > schema)).select("jsontostructs(value).*") > > > } - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Corrupt record handling in spark structured streaming and from_json function
https://stackoverflow.com/questions/53938967/writing-corrupt-data-from-kafka-json-datasource-in-spark-structured-streaming On Wed, Dec 26, 2018 at 2:42 PM Colin Williams wrote: > > From my initial impression it looks like I'd need to create my own > `from_json` using `jsonToStructs` as a reference but try to handle ` > case : BadRecordException => null ` or similar to try to write the non > matching string to a corrupt records column > > On Wed, Dec 26, 2018 at 1:55 PM Colin Williams > wrote: > > > > Hi, > > > > I'm trying to figure out how I can write records that don't match a > > json read schema via spark structred streaming to an output sink / > > parquet location. Previously I did this in batch via corrupt column > > features of batch. But in this spark structured streaming I'm reading > > from kafka a string and using from_json on the value of that string. > > If it doesn't match my schema then I from_json returns null for all > > the rows, and does not populate a corrupt record column. But I want to > > somehow obtain the source kafka string in a dataframe, and an write to > > a output sink / parquet location. > > > > def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: > > StructType) = { > > val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string")) > > jsonDataFrame.select(from_json(col("value"), > > schema)).select("jsontostructs(value).*") > > } - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Corrupt record handling in spark structured streaming and from_json function
>From my initial impression it looks like I'd need to create my own `from_json` using `jsonToStructs` as a reference but try to handle ` case : BadRecordException => null ` or similar to try to write the non matching string to a corrupt records column On Wed, Dec 26, 2018 at 1:55 PM Colin Williams wrote: > > Hi, > > I'm trying to figure out how I can write records that don't match a > json read schema via spark structred streaming to an output sink / > parquet location. Previously I did this in batch via corrupt column > features of batch. But in this spark structured streaming I'm reading > from kafka a string and using from_json on the value of that string. > If it doesn't match my schema then I from_json returns null for all > the rows, and does not populate a corrupt record column. But I want to > somehow obtain the source kafka string in a dataframe, and an write to > a output sink / parquet location. > > def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: StructType) > = { > val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string")) > jsonDataFrame.select(from_json(col("value"), > schema)).select("jsontostructs(value).*") > } - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Corrupt record handling in spark structured streaming and from_json function
Hi, I'm trying to figure out how I can write records that don't match a json read schema via spark structred streaming to an output sink / parquet location. Previously I did this in batch via corrupt column features of batch. But in this spark structured streaming I'm reading from kafka a string and using from_json on the value of that string. If it doesn't match my schema then I from_json returns null for all the rows, and does not populate a corrupt record column. But I want to somehow obtain the source kafka string in a dataframe, and an write to a output sink / parquet location. def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: StructType) = { val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string")) jsonDataFrame.select(from_json(col("value"), schema)).select("jsontostructs(value).*") } - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: from_json function
Maxim, thanks for your replay. I've left comment in the following jira issue https://issues.apache.org/jira/browse/SPARK-23194?focusedCommentId=16582025=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16582025 -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
from_json schema order
Hi, Can someone confirm whether ordering matters between the schema and underlying JSON string? Thanks, Brandon
Re: from_json function
Hello Denis, The from_json function supports only the fail fast mode, see: https://github.com/apache/spark/blob/e2ab7deae76d3b6f41b9ad4d0ece14ea28db40ce/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala#L568 Your settings "mode" -> "PERMISSIVE" will be overwritten On Wed, Aug 15, 2018 at 4:52 PM dbolshak wrote: > Hello community, > > I can not manage to run from_json method with "columnNameOfCorruptRecord" > option. > ``` > import org.apache.spark.sql.functions._ > > val data = Seq( > "{'number': 1}", > "{'number': }" > ) > > val schema = new StructType() > .add($"number".int) > .add($"_corrupt_record".string) > > val sourceDf = data.toDF("column") > > val jsonedDf = sourceDf > .select(from_json( > $"column", > schema, > Map("mode" -> "PERMISSIVE", "columnNameOfCorruptRecord" -> > "_corrupt_record") > ) as "data").selectExpr("data.number", "data._corrupt_record") > > jsonedDf.show() > ``` > Does anybody can help me get `_corrupt_record` non empty? > > Thanks in advance. > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Maxim Gekk Technical Solutions Lead Databricks Inc. maxim.g...@databricks.com databricks.com <http://databricks.com/>
from_json function
Hello community, I can not manage to run from_json method with "columnNameOfCorruptRecord" option. ``` import org.apache.spark.sql.functions._ val data = Seq( "{'number': 1}", "{'number': }" ) val schema = new StructType() .add($"number".int) .add($"_corrupt_record".string) val sourceDf = data.toDF("column") val jsonedDf = sourceDf .select(from_json( $"column", schema, Map("mode" -> "PERMISSIVE", "columnNameOfCorruptRecord" -> "_corrupt_record") ) as "data").selectExpr("data.number", "data._corrupt_record") jsonedDf.show() ``` Does anybody can help me get `_corrupt_record` non empty? Thanks in advance. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: How to handle java.sql.Date inside Maps with to_json / from_json
Hi all, I tested this with a Date outside a map and it works fine so I think the issue is simply for Dates inside Maps. I will create a Jira for this unless there are objections. Best regards, Patrick On Thu, 28 Jun 2018, 11:53 Patrick McGloin, wrote: > Consider the following test, which will fail on the final show: > > * case class *UnitTestCaseClassWithDateInsideMap(map: Map[Date, Int]) > > test(*"Test a Date as key in a Map"*) { > *val *map = *UnitTestCaseClassWithDateInsideMap*(*Map*(Date.*valueOf*( > *"2018-06-28"*) -> 1)) > *val *options = *Map*(*"timestampFormat" *-> *"/MM/dd HH:mm:ss.SSS"*, > *"dateFormat" *-> *"/MM/dd"*) > *val *schema = Encoders.*product* > [UnitTestCaseClassWithDateInsideMap].schema > > *val *mapDF = *Seq*(map).toDF() > *val *jsonDF = mapDF.select(*to_json*(*struct*(mapDF.columns.head, > mapDF.columns.tail:_*), options)) > jsonDF.show() > > *val *jsonString = jsonDF.map(_.getString(0)).collect().head > > *val *stringDF = *Seq*(jsonString).toDF(*"json"*) > *val *parsedDF = stringDF.select(*from_json*(*$"json"*, schema, options)) > parsedDF.show() > } > > > The result of the line "jsonDF.show()" is as follows: > > +---+ > |structstojson(named_struct(NamePlaceholder(), map))| > +---+ > |{"map":{"17710":1}}| > +---+ > > As can be seen the date is not formatted correctly. The error with > "parsedDF.show()" is: > > java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String > cannot be cast to java.lang.Integer > > I have tried adding the options to to_json / from_json but it hasn't > helped. Am I using the wrong options? > > Is there another way to do this? > > Best regards, > Patrick > This message has been sent by ABN AMRO Bank N.V., which has its seat at Gustav > Mahlerlaan 10 (1082 PP) Amsterdam, the Netherlands > <https://maps.google.com/?q=Gustav+Mahlerlaan+10+(1082+PP)+Amsterdam,+the+Netherlands=gmail=g>, > and is registered in the Commercial Register of Amsterdam under number > 34334259. >
How to handle java.sql.Date inside Maps with to_json / from_json
Consider the following test, which will fail on the final show: * case class *UnitTestCaseClassWithDateInsideMap(map: Map[Date, Int]) test(*"Test a Date as key in a Map"*) { *val *map = *UnitTestCaseClassWithDateInsideMap*(*Map*(Date.*valueOf*( *"2018-06-28"*) -> 1)) *val *options = *Map*(*"timestampFormat" *-> *"/MM/dd HH:mm:ss.SSS"*, *"dateFormat" *-> *"/MM/dd"*) *val *schema = Encoders.*product* [UnitTestCaseClassWithDateInsideMap].schema *val *mapDF = *Seq*(map).toDF() *val *jsonDF = mapDF.select(*to_json*(*struct*(mapDF.columns.head, mapDF.columns.tail:_*), options)) jsonDF.show() *val *jsonString = jsonDF.map(_.getString(0)).collect().head *val *stringDF = *Seq*(jsonString).toDF(*"json"*) *val *parsedDF = stringDF.select(*from_json*(*$"json"*, schema, options)) parsedDF.show() } The result of the line "jsonDF.show()" is as follows: +---+ |structstojson(named_struct(NamePlaceholder(), map))| +---+ |{"map":{"17710":1}}| +---+ As can be seen the date is not formatted correctly. The error with "parsedDF.show()" is: java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer I have tried adding the options to to_json / from_json but it hasn't helped. Am I using the wrong options? Is there another way to do this? Best regards, Patrick This message has been sent by ABN AMRO Bank N.V., which has its seat at Gustav Mahlerlaan 10 (1082 PP) Amsterdam, the Netherlands <https://maps.google.com/?q=Gustav+Mahlerlaan+10+(1082+PP)+Amsterdam,+the+Netherlands=gmail=g>, and is registered in the Commercial Register of Amsterdam under number 34334259.
Re: pyspark + from_json(col("col_name"), schema) returns all null
Hi, Not that I'm aware of, but in your case checking out whether a JSON message fit your schema and the pipeline would've taken pyspark alone with JSONs on disk, wouldn't it? Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Mon, Dec 11, 2017 at 12:49 AM, salemiwrote: > I found the root cause! There was mismatch between the StructField type and > the json message. > > > Is there a good write up / wiki out there that describes how to debug spark > jobs? > > > Thanks > > > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: pyspark + from_json(col("col_name"), schema) returns all null
I found the root cause! There was mismatch between the StructField type and the json message. Is there a good write up / wiki out there that describes how to debug spark jobs? Thanks -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
pyspark + from_json(col("col_name"), schema) returns all null
Hi All, I am using pyspark and consuming messages from Kafka and when I .select(from_json(col("col_name"), schema)) the return values are all null. I looked at the json messages and they are valid strings. any ideas? -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
RE: from_json()
Hey Sam, Nope – it does not work the way I want. I guess it is only working with one type… Trying to convert: {"releaseDate":147944880,"link":"http://amzn.to/2kup94P","id":1,"authorId":1,"title":"Fantastic Beasts and Where to Find Them: The Original Screenplay"} I get: [Executor task launch worker for task 3:ERROR] Logging$class: Exception in task 0.0 in stage 3.0 (TID 3) java.lang.IllegalArgumentException: Failed to convert the JSON string '{"releaseDate":147944880,"link":"http://amzn.to/2kup94P","id":1,"authorId":1,"title":"Fantastic Beasts and Where to Find Them: The Original Screenplay"}' to a data type. at org.apache.spark.sql.types.DataType$.parseDataType(DataType.scala:176) at org.apache.spark.sql.types.DataType$.fromJson(DataType.scala:108) at org.apache.spark.sql.types.DataType.fromJson(DataType.scala) at net.jgp.labs.spark.l250_map.l031_dataset_book_json_in_progress.CsvToDatasetBookAsJson$BookMapper.call(CsvToDatasetBookAsJson.java:44) at net.jgp.labs.spark.l250_map.l031_dataset_book_json_in_progress.CsvToDatasetBookAsJson$BookMapper.call(CsvToDatasetBookAsJson.java:1) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) From: JG Perrin [mailto:jper...@lumeris.com] Sent: Monday, August 28, 2017 1:29 PM To: Sam Elamin <hussam.ela...@gmail.com> Cc: user@spark.apache.org Subject: RE: from_json() Thanks Sam – this might be the solution. I will investigate! From: Sam Elamin [mailto:hussam.ela...@gmail.com] Sent: Monday, August 28, 2017 1:14 PM To: JG Perrin <jper...@lumeris.com<mailto:jper...@lumeris.com>> Cc: user@spark.apache.org<mailto:user@spark.apache.org> Subject: Re: from_json() Hi jg, Perhaps I am misunderstanding you, but if you just want to create a new schema from a df its fairly simple, assuming you have a schema already predefined or in a string. i.e. val newSchema = DataType.fromJson(json_schema_string) then all you need to do is re-create the dataframe using this new dataframe sqlContext.createDataFrame(oldDF.rdd,newSchema) Regards Sam On Mon, Aug 28, 2017 at 5:57 PM, JG Perrin <jper...@lumeris.com<mailto:jper...@lumeris.com>> wrote: Is there a way to not have to specify a schema when using from_json() or infer the schema? When you read a JSON doc from disk, you can infer the schema. Should I write it to disk before (ouch)? jg This electronic transmission and any documents accompanying this electronic transmission contain confidential information belonging to the sender. This information may contain confidential health information that is legally privileged. The information is intended only for the use of the individual or entity named above. The authorized recipient of this transmission is prohibited from disclosing this information to any other party unless required to do so by law or regulation and is required to delete or destroy the information after its stated need has been fulfilled. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution or the taking of any action in reliance on or regarding the contents of this electronically transmitted information is strictly prohibited. If you have received this E-mail in error, please notify the sender and delete this message immediately.
RE: from_json()
Thanks Sam – this might be the solution. I will investigate! From: Sam Elamin [mailto:hussam.ela...@gmail.com] Sent: Monday, August 28, 2017 1:14 PM To: JG Perrin <jper...@lumeris.com> Cc: user@spark.apache.org Subject: Re: from_json() Hi jg, Perhaps I am misunderstanding you, but if you just want to create a new schema from a df its fairly simple, assuming you have a schema already predefined or in a string. i.e. val newSchema = DataType.fromJson(json_schema_string) then all you need to do is re-create the dataframe using this new dataframe sqlContext.createDataFrame(oldDF.rdd,newSchema) Regards Sam On Mon, Aug 28, 2017 at 5:57 PM, JG Perrin <jper...@lumeris.com<mailto:jper...@lumeris.com>> wrote: Is there a way to not have to specify a schema when using from_json() or infer the schema? When you read a JSON doc from disk, you can infer the schema. Should I write it to disk before (ouch)? jg This electronic transmission and any documents accompanying this electronic transmission contain confidential information belonging to the sender. This information may contain confidential health information that is legally privileged. The information is intended only for the use of the individual or entity named above. The authorized recipient of this transmission is prohibited from disclosing this information to any other party unless required to do so by law or regulation and is required to delete or destroy the information after its stated need has been fulfilled. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution or the taking of any action in reliance on or regarding the contents of this electronically transmitted information is strictly prohibited. If you have received this E-mail in error, please notify the sender and delete this message immediately.
Re: from_json()
Hi jg, Perhaps I am misunderstanding you, but if you just want to create a new schema from a df its fairly simple, assuming you have a schema already predefined or in a string. i.e. val newSchema = DataType.fromJson(json_schema_string) then all you need to do is re-create the dataframe using this new dataframe sqlContext.createDataFrame(oldDF.rdd,newSchema) Regards Sam On Mon, Aug 28, 2017 at 5:57 PM, JG Perrin <jper...@lumeris.com> wrote: > Is there a way to not have to specify a schema when using from_json() or > infer the schema? When you read a JSON doc from disk, you can infer the > schema. Should I write it to disk before (ouch)? > > > > jg > -- > > This electronic transmission and any documents accompanying this > electronic transmission contain confidential information belonging to the > sender. This information may contain confidential health information that > is legally privileged. The information is intended only for the use of the > individual or entity named above. The authorized recipient of this > transmission is prohibited from disclosing this information to any other > party unless required to do so by law or regulation and is required to > delete or destroy the information after its stated need has been fulfilled. > If you are not the intended recipient, you are hereby notified that any > disclosure, copying, distribution or the taking of any action in reliance > on or regarding the contents of this electronically transmitted information > is strictly prohibited. If you have received this E-mail in error, please > notify the sender and delete this message immediately. >
from_json()
Is there a way to not have to specify a schema when using from_json() or infer the schema? When you read a JSON doc from disk, you can infer the schema. Should I write it to disk before (ouch)? jg __ This electronic transmission and any documents accompanying this electronic transmission contain confidential information belonging to the sender. This information may contain confidential health information that is legally privileged. The information is intended only for the use of the individual or entity named above. The authorized recipient of this transmission is prohibited from disclosing this information to any other party unless required to do so by law or regulation and is required to delete or destroy the information after its stated need has been fulfilled. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution or the taking of any action in reliance on or regarding the contents of this electronically transmitted information is strictly prohibited. If you have received this E-mail in error, please notify the sender and delete this message immediately.