Hi!

Thanks for the response. Looks like from_json requires schema ahead of
time. Is there any function I can use to infer schema from the json
messages I am receiving through Kafka?  I tried with the code below however
I get the following exception.

org.apache.spark.sql.AnalysisException: Queries with streaming sources must
be executed with writeStream.start()


//code
val ds = datasetRows.selectExpr("CAST(value AS STRING)").toJSON //
datasetRows is of type DataSet<Row> that I get from loading from Kafka

val foo = ds.select("*").count()
val query =
foo.writeStream.outputMode("complete").format("console").start();
query.awaitTermination()

I am just trying to parse Json messages from Kafka put into Dataframe or
Dataset without requiring the schema and doing the simple count.

Thanks!



On Sat, May 13, 2017 at 3:29 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> I understand the confusing. "json" format is for json encoded files being
> written in a directory. For Kafka, use "kafk" format. Then you decode the
> binary data as a json, you can use the function "from_json" (spark 2.1 and
> above). Here is our blog post on this.
>
> https://databricks.com/blog/2017/04/26/processing-data-in-
> apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
>
> And my talk also explains this.
>
> https://spark-summit.org/east-2017/events/making-structured-
> streaming-ready-for-production-updates-and-future-directions/
>
> On Sat, May 13, 2017 at 3:42 AM, kant kodali <kanth...@gmail.com> wrote:
>
>> HI All,
>>
>> What is the difference between sparkSession.readStream.format("kafka")
>> vs sparkSession.readStream.format("json") ?
>> I am sending json encoded messages in Kafka and I am not sure which one
>> of the above I should use?
>>
>> Thanks!
>>
>>
>

Reply via email to