Thanks Cody for the pointer. I am able to do this now. Not using checkpointing. Rather storing offsets in zookeeper for fault tolerance. Spark Config changes now getting reflected in code deployment. *Using this api :* *KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)* *instead of :* *KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)*
*One Quick question : *What is need of checkpointing if we can achieve both fault tolerance and application code/config changes without checkpointing? Is there anything else which checkpointing gives? I might be missing something. Regards, Chandan On Thu, Aug 18, 2016 at 8:27 PM, Cody Koeninger <c...@koeninger.org> wrote: > Yeah the solutions are outlined in the doc link. Or just don't rely on > checkpoints > On Aug 18, 2016 8:53 AM, "chandan prakash" <chandanbaran...@gmail.com> > wrote: > >> Yes, >> i looked into the source code implementation. sparkConf is serialized >> and saved during checkpointing and re-created from the checkpoint directory >> at time of restart. So any sparkConf parameter which you load from >> application.config and set in sparkConf object in code cannot be changed >> and reflected with checkpointing. :( >> >> Is there is any work around of reading changed sparkConf parameter value >> with using checkpoiting? >> p.s. i am not adding new parameter, i am just changing values of some >> existing sparkConf param. >> >> This is a common case and there must be some solution for this. >> >> On Thu, Aug 18, 2016 at 6:07 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> Checkpointing is not kafka-specific. It encompasses metadata about the >>> application. You can't re-use a checkpoint if your application has changed. >>> >>> http://spark.apache.org/docs/latest/streaming-programming-gu >>> ide.html#upgrading-application-code >>> >>> >>> On Thu, Aug 18, 2016 at 4:39 AM, chandan prakash < >>> chandanbaran...@gmail.com> wrote: >>> >>>> Is it possible that i use checkpoint directory to restart streaming but >>>> with modified parameter value in config file (e.g. username/password for >>>> db connection) ? >>>> Thanks in advance. >>>> >>>> Regards, >>>> Chandan >>>> >>>> On Thu, Aug 18, 2016 at 1:10 PM, chandan prakash < >>>> chandanbaran...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> I am using direct kafka with checkpointing of offsets same as : >>>>> https://github.com/koeninger/kafka-exactly-once/blob/master/ >>>>> src/main/scala/example/IdempotentExample.scala >>>>> >>>>> I need to change some parameters like db connection params : >>>>> username/password for db connection . >>>>> I stopped streaming gracefully ,changed parameters in config file and >>>>> restarted streaming. >>>>> *Issue : changed parameters username/password are not being >>>>> considered.* >>>>> >>>>> *Question* : >>>>> As per my understanding , Checkpointing should only save offsets of >>>>> kafka partitions and not the credentials of the db connection. >>>>> Why its picking old db connection params ? >>>>> >>>>> I am declaring params in main method and not in setUpSsc(0 method. >>>>> My code is identical to that in the above program link as below: >>>>> val jdbcDriver = conf.getString("jdbc.driver") >>>>> val jdbcUrl = conf.getString("jdbc.url") >>>>> *val jdbcUser = conf.getString("jdbc.user")* >>>>> * val jdbcPassword = conf.getString("jdbc.password")* >>>>> // while the job doesn't strictly need checkpointing, >>>>> // we'll checkpoint to avoid replaying the whole kafka log in case of >>>>> failure >>>>> val checkpointDir = conf.getString("checkpointDir") >>>>> val ssc = StreamingContext.getOrCreate( >>>>> checkpointDir, >>>>> setupSsc(topics, kafkaParams, jdbcDriver, jdbcUrl, *jdbcUser*, >>>>> *jdbcPassword*, checkpointDir) _ >>>>> ) >>>>> >>>>> >>>>> >>>>> -- >>>>> Chandan Prakash >>>>> >>>>> >>>> >>>> >>>> -- >>>> Chandan Prakash >>>> >>>> >>> >> >> >> -- >> Chandan Prakash >> >> -- Chandan Prakash