Yes,
 i looked into the source code implementation.  sparkConf is serialized and
saved during checkpointing and re-created from the checkpoint directory at
time of restart. So any sparkConf parameter which you load from
application.config and set in sparkConf object in code cannot be changed
and reflected with checkpointing.  :(

Is there is any work around of reading changed sparkConf parameter value
with using checkpoiting?
p.s. i am not adding new parameter, i am just changing values of some
existing sparkConf param.

This is a common case and there must be some solution for this.

On Thu, Aug 18, 2016 at 6:07 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Checkpointing is not kafka-specific.  It encompasses metadata about the
> application.  You can't re-use a checkpoint if your application has changed.
>
> http://spark.apache.org/docs/latest/streaming-programming-
> guide.html#upgrading-application-code
>
>
> On Thu, Aug 18, 2016 at 4:39 AM, chandan prakash <
> chandanbaran...@gmail.com> wrote:
>
>> Is it possible that i use checkpoint directory to restart streaming but
>> with modified parameter value in config file (e.g.  username/password for
>> db connection)  ?
>> Thanks in advance.
>>
>> Regards,
>> Chandan
>>
>> On Thu, Aug 18, 2016 at 1:10 PM, chandan prakash <
>> chandanbaran...@gmail.com> wrote:
>>
>>> Hi,
>>> I am using direct kafka with checkpointing of offsets same as :
>>> https://github.com/koeninger/kafka-exactly-once/blob/master/
>>> src/main/scala/example/IdempotentExample.scala
>>>
>>> I need to change some parameters like db connection params :
>>> username/password for db connection .
>>> I stopped streaming gracefully ,changed parameters in config file and
>>> restarted streaming.
>>> *Issue : changed parameters  username/password are not being considered.*
>>>
>>> *Question* :
>>> As per my understanding , Checkpointing should only save offsets of
>>> kafka partitions and not the credentials of the db connection.
>>> Why its picking old db connection params ?
>>>
>>> I am declaring params in main method and not in setUpSsc(0 method.
>>> My code is identical to that in the above program link  as below:
>>> val jdbcDriver = conf.getString("jdbc.driver")
>>> val jdbcUrl = conf.getString("jdbc.url")
>>> *val jdbcUser = conf.getString("jdbc.user")*
>>> * val jdbcPassword = conf.getString("jdbc.password")*
>>> // while the job doesn't strictly need checkpointing,
>>> // we'll checkpoint to avoid replaying the whole kafka log in case of
>>> failure
>>> val checkpointDir = conf.getString("checkpointDir")
>>> val ssc = StreamingContext.getOrCreate(
>>> checkpointDir,
>>> setupSsc(topics, kafkaParams, jdbcDriver, jdbcUrl, *jdbcUser*,
>>> *jdbcPassword*, checkpointDir) _
>>> )
>>>
>>>
>>>
>>> --
>>> Chandan Prakash
>>>
>>>
>>
>>
>> --
>> Chandan Prakash
>>
>>
>


-- 
Chandan Prakash

Reply via email to