Hi Michael, Interestingly that doesn't seem to quite work for me for some reason. Here is what I have
Datset name | id | country ------------------------- kant | 1 | usa john | 2 | usa And here is my code Dataset<Row> ds = getKafkaStream(); // This dataset represents the one above StreamingQuery query = ds.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start(); query.awaitTermination(); *This works completely fine and I can see the rows on my console.* Now if I change it to this. Dataset<Row> ds = getKafkaStream(); // This dataset represents the one above Dataset<String> jsonDS = ds.select(to_json(struct(ds.col("*")))).as(Encoders.STRING()); StreamingQuery query2 = jsonDS.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start(); query2.awaitTermination(); *I dont see any rows on my console and I made sure I waited for a while.* *The moment I change it back to above code and run it works again.* On Mon, Sep 11, 2017 at 2:28 PM, Michael Armbrust <mich...@databricks.com> wrote: > The following will convert the whole row to JSON. > > import org.apache.spark.sql.functions.* > df.select(to_json(struct(col("*")))) > > On Sat, Sep 9, 2017 at 6:27 PM, kant kodali <kanth...@gmail.com> wrote: > >> Thanks Ryan! In this case, I will have Dataset<Row> so is there a way to >> convert Row to Json string? >> >> Thanks >> >> On Sat, Sep 9, 2017 at 5:14 PM, Shixiong(Ryan) Zhu < >> shixi...@databricks.com> wrote: >> >>> It's because "toJSON" doesn't support Structured Streaming. The current >>> implementation will convert the Dataset to an RDD, which is not supported >>> by streaming queries. >>> >>> On Sat, Sep 9, 2017 at 4:40 PM, kant kodali <kanth...@gmail.com> wrote: >>> >>>> yes it is a streaming dataset. so what is the problem with following >>>> code? >>>> >>>> Dataset<String> ds = dataset.toJSON().map(()->{some function that returns >>>> a string}); >>>> StreamingQuery query = ds.writeStream().start(); >>>> query.awaitTermination(); >>>> >>>> >>>> On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheung <felixcheun...@hotmail.com >>>> > wrote: >>>> >>>>> What is newDS? >>>>> If it is a Streaming Dataset/DataFrame (since you have writeStream >>>>> there) then there seems to be an issue preventing toJSON to work. >>>>> >>>>> ------------------------------ >>>>> *From:* kant kodali <kanth...@gmail.com> >>>>> *Sent:* Saturday, September 9, 2017 4:04:33 PM >>>>> *To:* user @spark >>>>> *Subject:* Queries with streaming sources must be executed with >>>>> writeStream.start() >>>>> >>>>> Hi All, >>>>> >>>>> I have the following code and I am not sure what's wrong with it? I >>>>> cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark >>>>> 2.2.0 so I am wondering if there is any work around? >>>>> >>>>> Dataset<String> ds = newDS.toJSON().map(()->{some function that returns >>>>> a string}); >>>>> StreamingQuery query = ds.writeStream().start(); >>>>> query.awaitTermination(); >>>>> >>>>> >>>> >>> >> >