Yes, i looked into the source code implementation. sparkConf is serialized and saved during checkpointing and re-created from the checkpoint directory at time of restart. So any sparkConf parameter which you load from application.config and set in sparkConf object in code cannot be changed and reflected with checkpointing. :(
Is there is any work around of reading changed sparkConf parameter value with using checkpoiting? p.s. i am not adding new parameter, i am just changing values of some existing sparkConf param. This is a common case and there must be some solution for this. On Thu, Aug 18, 2016 at 6:07 PM, Cody Koeninger <c...@koeninger.org> wrote: > Checkpointing is not kafka-specific. It encompasses metadata about the > application. You can't re-use a checkpoint if your application has changed. > > http://spark.apache.org/docs/latest/streaming-programming- > guide.html#upgrading-application-code > > > On Thu, Aug 18, 2016 at 4:39 AM, chandan prakash < > chandanbaran...@gmail.com> wrote: > >> Is it possible that i use checkpoint directory to restart streaming but >> with modified parameter value in config file (e.g. username/password for >> db connection) ? >> Thanks in advance. >> >> Regards, >> Chandan >> >> On Thu, Aug 18, 2016 at 1:10 PM, chandan prakash < >> chandanbaran...@gmail.com> wrote: >> >>> Hi, >>> I am using direct kafka with checkpointing of offsets same as : >>> https://github.com/koeninger/kafka-exactly-once/blob/master/ >>> src/main/scala/example/IdempotentExample.scala >>> >>> I need to change some parameters like db connection params : >>> username/password for db connection . >>> I stopped streaming gracefully ,changed parameters in config file and >>> restarted streaming. >>> *Issue : changed parameters username/password are not being considered.* >>> >>> *Question* : >>> As per my understanding , Checkpointing should only save offsets of >>> kafka partitions and not the credentials of the db connection. >>> Why its picking old db connection params ? >>> >>> I am declaring params in main method and not in setUpSsc(0 method. >>> My code is identical to that in the above program link as below: >>> val jdbcDriver = conf.getString("jdbc.driver") >>> val jdbcUrl = conf.getString("jdbc.url") >>> *val jdbcUser = conf.getString("jdbc.user")* >>> * val jdbcPassword = conf.getString("jdbc.password")* >>> // while the job doesn't strictly need checkpointing, >>> // we'll checkpoint to avoid replaying the whole kafka log in case of >>> failure >>> val checkpointDir = conf.getString("checkpointDir") >>> val ssc = StreamingContext.getOrCreate( >>> checkpointDir, >>> setupSsc(topics, kafkaParams, jdbcDriver, jdbcUrl, *jdbcUser*, >>> *jdbcPassword*, checkpointDir) _ >>> ) >>> >>> >>> >>> -- >>> Chandan Prakash >>> >>> >> >> >> -- >> Chandan Prakash >> >> > -- Chandan Prakash