change it to readStream instead of read as below

val df = spark
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")

Check is this helpful


On Wed, Mar 7, 2018 at 7:33 PM Junfeng Chen <> wrote:

> I am struggling in trying to read data in kafka and save them to parquet
> file on hdfs by using spark streaming according to this post
> My code is similar to  following
> val df = spark
>   .read
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>   .option("subscribe", "topic1")
>   .load()
> df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>   .as[(String, String)]
>   .write.parquet("hdfs://data.parquet")
> What the difference is I am writing in Java language.
> But in practice, this code just run once and then exit gracefully.
> Although it produces the parquet file successfully and no any exception is
> threw out , it runs like a normal spark program rather than a spark
> streaming program.
> What should I do if want to read kafka and save them to parquet in batch?
> Regard,
> Junfeng Chen

Reply via email to