Hi,

can you see whether using the option for checkPointLocation would work in
case you are using structured streaming?

Regards,
Gourav Sengupta

On Tue, Jul 24, 2018 at 12:30 PM, John, Vishal (Agoda) <
vishal.j...@agoda.com.invalid> wrote:

>
> Hello all,
>
>
> I have to read data from Kafka topic at regular intervals. I create the
> dataframe as shown below.  I don’t want to start reading from the beginning
> on each run. At the same time, I don’t want to miss the messages between
> run intervals.
>
> val queryDf = sqlContext
>   .read
>   .format("kafka")
>   .option("kafka.bootstrap.servers", hosts)
>   .option("enable.auto.commit", true)
>   .option("subscribe", topicName)
>   .option("auto.commit.interval.ms", 1000)
>   .option("startingOffsets", " latest")  //??  earliest OR latest
>   .load()
>   .selectExpr("CAST(value AS STRING) as message")
>
> I would like to understand where the offsets will be stored, so that I can
> supply it each time the application starts. Or is there a way to supply a
> custom location where to store the offsets.
> This is not a Steaming application. So, I am not sure if checkpoint
> directory is valid in this case.
>
> Any pointers would be highly helpful.
>
>
> thanks,
> Vishal
>
> ________________________________
> This message is confidential and is for the sole use of the intended
> recipient(s). It may also be privileged or otherwise protected by copyright
> or other legal rules. If you have received it by mistake please let us know
> by reply email and delete it from your system. It is prohibited to copy
> this message or disclose its content to anyone. Any confidentiality or
> privilege is not waived or lost by any mistaken delivery or unauthorized
> disclosure of the message. All messages sent to and from Agoda may be
> monitored to ensure compliance with company policies, to protect the
> company's interests and to remove potential malware. Electronic messages
> may be intercepted, amended, lost or deleted, or contain viruses.
>

Reply via email to