Im working on structered streaming application wherein im reading from
Kafka as stream and for each batch of streams i need to perform S3 lookup
file (which is nearly 200gb) to fetch some attributes .So im using
df.persist() (basically caching the lookup) but i need to refresh the
dataframe as the S3 lookup data changes frequently.im using below code


    class RefreshcachedDF(sparkSession: SparkSession) extends
StreamingQueryListener {

        override def onQueryStarted(event:
org.apache.spark.sql.streaming.StreamingQueryListener.QueryStartedEvent):
Unit = {}
        override def onQueryTerminated(event:
org.apache.spark.sql.streaming.StreamingQueryListener.QueryTerminatedEvent):
Unit = {}

        override def onQueryProgress(event:
StreamingQueryListener.QueryProgressEvent): Unit = {
           val currTime = System.currentTimeMillis()
           if (currTime > (latestrefreshtime mentioned in a
globaltempview)) {
              //oldDF is a cached Dataframe created from GlobalTempView
which is of size 150GB.
              oldDF.unpersist() //I guess this is async call ,should i use
unpersist(true) which is blocking?and is it safe ?
              val inputDf: DataFrame = readFile(spec, sparkSession)
              val recreateddf = inputDf.persist()
              val count = recreateddf.count()
              }

          }
        }
      }


Is the above approach is a better solution to refresh cached dataframe? and
the trigger for this refresh is will store the expirydate of cache for S3
in a globaltempview .

Note:S3 is one lookup source but i do have other sources which has data
size of 20 to 30 GB

 - So the question is this the right place to refresh the cached df ?
 - if yes should i use blocking or non-blocking unpersist method as the
data is huge 15GB?
 - For similar issue i see below response from Tdas with subject as Re:
Refreshing a persisted RDD

    "Yes, you will have to recreate the streaming Dataframe along with the
    static Dataframe, and restart the query. There isnt a currently
feasible to
    do this without a query restart. But restarting a query WITHOUT
restarting
    the whole application + spark cluster, is reasonably fast. If your
    applicatoin can tolerate 10 second latencies, then stopping and
restarting
    a query within the same Spark application is a reasonable solution."

[http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/browser]


  [1]: http://SparkMailingList

So if thats better solution should i restart query as below

query.processAllavaialble()
query.stop()
df.unpersist()
val inputDf: DataFrame = readFile(spec, sparkSession) //read file from S3
or anyother source
val recreateddf = inputDf.persist()
query.start()


when i looked into spark documentation of above methods
void processAllAvailable() ///documentation says This method is intended
for testing///
Blocks until all available data in the source has been processed and
committed to the sink. This method is intended for testing. Note that in
the case of continually arriving data, this method may block forever.
Additionally, this method is only guaranteed to block until data that has
been synchronously appended data to a Source prior to invocation. (i.e.
getOffset must immediately reflect the addition).

stop()
Stops the execution of this query if it is running. This method blocks
until the threads performing execution has stopped.

https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/streaming/StreamingQuery.html#processAllAvailable()

Please suggest a better approach to refresh the cache.

Reply via email to