Hi! Thanks for the response. Looks like from_json requires schema ahead of time. Is there any function I can use to infer schema from the json messages I am receiving through Kafka? I tried with the code below however I get the following exception.
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start() //code val ds = datasetRows.selectExpr("CAST(value AS STRING)").toJSON // datasetRows is of type DataSet<Row> that I get from loading from Kafka val foo = ds.select("*").count() val query = foo.writeStream.outputMode("complete").format("console").start(); query.awaitTermination() I am just trying to parse Json messages from Kafka put into Dataframe or Dataset without requiring the schema and doing the simple count. Thanks! On Sat, May 13, 2017 at 3:29 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > I understand the confusing. "json" format is for json encoded files being > written in a directory. For Kafka, use "kafk" format. Then you decode the > binary data as a json, you can use the function "from_json" (spark 2.1 and > above). Here is our blog post on this. > > https://databricks.com/blog/2017/04/26/processing-data-in- > apache-kafka-with-structured-streaming-in-apache-spark-2-2.html > > And my talk also explains this. > > https://spark-summit.org/east-2017/events/making-structured- > streaming-ready-for-production-updates-and-future-directions/ > > On Sat, May 13, 2017 at 3:42 AM, kant kodali <kanth...@gmail.com> wrote: > >> HI All, >> >> What is the difference between sparkSession.readStream.format("kafka") >> vs sparkSession.readStream.format("json") ? >> I am sending json encoded messages in Kafka and I am not sure which one >> of the above I should use? >> >> Thanks! >> >> >