Re: Spark Streaming application code change and stateful transformations
The reason I'm dismissing the graceful shutdown approach is that if your app crashes, and can't be restarted without code changes (e.g. a bug needs to be fixed), you're screwed. On Thu, Sep 17, 2015 at 3:56 AM, Adrian Tanase wrote: > This section in the streaming guide also outlines a new option – use 2 > versions in parallel for a period of time, controlling the draining / > transition in the application level. > > http://spark.apache.org/docs/latest/streaming-programming-guide.html#upgrading-application-code > > Also – I would not dismiss the graceful shutdown approach, since you’re > controlling the shutdown. > At a minimum, you can monitor if it was successful and if it failed, you > simply restart the app, relying on checkpoint recovery before trying again… > > I’m copy-pasting more details from an answer I posted earlier to a similar > question: > >1. Use 2 versions in parallel, drain the queue up to a point and strat >fresh in the new version, only processing events from that point forward > 1. Note that “up to a point” is specific to you state management > logic, it might mean “user sessions stated after 4 am” NOT “events > received > after 4 am” >2. Graceful shutdown and saving data to DB, followed by checkpoint >cleanup / new checkpoint dir > 1. On restat, you need to use the updateStateByKey that takes an > initialRdd with the values preloaded from DB > 2. By cleaning the checkpoint in between upgrades, data is loaded > only once > > Hope this helps, > -adrian > > From: Ofir Kerker > Date: Wednesday, September 16, 2015 at 6:12 PM > To: Cody Koeninger > Cc: "user@spark.apache.org" > Subject: Re: Spark Streaming application code change and stateful > transformations > > Thanks Cody! > The 2nd solution is safer but seems wasteful :/ > I'll try to optimize it by keeping in addition to the 'last-complete-hour' > the corresponding offsets that bound the incomplete data to try and > fast-forward only the last couple of hours in the worst case. > > On Mon, Sep 14, 2015 at 22:14 Cody Koeninger wrote: > >> Solution 2 sounds better to me. You aren't always going to have graceful >> shutdowns. >> >> On Mon, Sep 14, 2015 at 1:49 PM, Ofir Kerker >> wrote: >> >>> Hi, >>> My Spark Streaming application consumes messages (events) from Kafka >>> every >>> 10 seconds using the direct stream approach and aggregates these messages >>> into hourly aggregations (to answer analytics questions like: "How many >>> users from Paris visited page X between 8PM to 9PM") and save the data to >>> Cassandra. >>> >>> I was wondering if there's a good practice for handling a code change in >>> a >>> Spark Streaming applications that uses stateful transformations >>> (updateStateByKey for example) because the new application code will not >>> be >>> able to use the data that was checkpointed by the former application. >>> I have thought of a few solutions for this issue and was hoping some of >>> you >>> have some experience with such case and can suggest other solutions or >>> feedback my suggested solutions: >>> *Solution #1*: On a graceful shutdown, in addition to the current Kafka >>> offsets, persist the current aggregated data into Cassandra tables >>> (different than the regular aggregation tables) that would allow reading >>> them easily when the new application starts in order to build the initial >>> state. >>> *Solution #2*: When an hour is "complete" (i.e not expecting more events >>> with the timestamp of this hour), update somewhere persistent (DB / >>> shared >>> file) the last-complete-hour. This will allow me, when the new >>> application >>> starts, to read all the events from Kafka from the beginning of retention >>> period (last X hours) and ignore events from timestamp smaller or equal >>> than >>> the last-complete-hour. >>> >>> I'll be happy to get your feedback! >>> >>> Thanks, >>> Ofir >>> >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-application-code-change-and-stateful-transformations-tp24692.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >>
Re: Spark Streaming application code change and stateful transformations
This section in the streaming guide also outlines a new option – use 2 versions in parallel for a period of time, controlling the draining / transition in the application level. http://spark.apache.org/docs/latest/streaming-programming-guide.html#upgrading-application-code Also – I would not dismiss the graceful shutdown approach, since you’re controlling the shutdown. At a minimum, you can monitor if it was successful and if it failed, you simply restart the app, relying on checkpoint recovery before trying again… I’m copy-pasting more details from an answer I posted earlier to a similar question: 1. Use 2 versions in parallel, drain the queue up to a point and strat fresh in the new version, only processing events from that point forward * Note that “up to a point” is specific to you state management logic, it might mean “user sessions stated after 4 am” NOT “events received after 4 am” 2. Graceful shutdown and saving data to DB, followed by checkpoint cleanup / new checkpoint dir * On restat, you need to use the updateStateByKey that takes an initialRdd with the values preloaded from DB * By cleaning the checkpoint in between upgrades, data is loaded only once Hope this helps, -adrian From: Ofir Kerker Date: Wednesday, September 16, 2015 at 6:12 PM To: Cody Koeninger Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" Subject: Re: Spark Streaming application code change and stateful transformations Thanks Cody! The 2nd solution is safer but seems wasteful :/ I'll try to optimize it by keeping in addition to the 'last-complete-hour' the corresponding offsets that bound the incomplete data to try and fast-forward only the last couple of hours in the worst case. On Mon, Sep 14, 2015 at 22:14 Cody Koeninger mailto:c...@koeninger.org>> wrote: Solution 2 sounds better to me. You aren't always going to have graceful shutdowns. On Mon, Sep 14, 2015 at 1:49 PM, Ofir Kerker mailto:ofir.ker...@gmail.com>> wrote: Hi, My Spark Streaming application consumes messages (events) from Kafka every 10 seconds using the direct stream approach and aggregates these messages into hourly aggregations (to answer analytics questions like: "How many users from Paris visited page X between 8PM to 9PM") and save the data to Cassandra. I was wondering if there's a good practice for handling a code change in a Spark Streaming applications that uses stateful transformations (updateStateByKey for example) because the new application code will not be able to use the data that was checkpointed by the former application. I have thought of a few solutions for this issue and was hoping some of you have some experience with such case and can suggest other solutions or feedback my suggested solutions: *Solution #1*: On a graceful shutdown, in addition to the current Kafka offsets, persist the current aggregated data into Cassandra tables (different than the regular aggregation tables) that would allow reading them easily when the new application starts in order to build the initial state. *Solution #2*: When an hour is "complete" (i.e not expecting more events with the timestamp of this hour), update somewhere persistent (DB / shared file) the last-complete-hour. This will allow me, when the new application starts, to read all the events from Kafka from the beginning of retention period (last X hours) and ignore events from timestamp smaller or equal than the last-complete-hour. I'll be happy to get your feedback! Thanks, Ofir -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-application-code-change-and-stateful-transformations-tp24692.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>
Re: Spark Streaming application code change and stateful transformations
Thanks Cody! The 2nd solution is safer but seems wasteful :/ I'll try to optimize it by keeping in addition to the 'last-complete-hour' the corresponding offsets that bound the incomplete data to try and fast-forward only the last couple of hours in the worst case. On Mon, Sep 14, 2015 at 22:14 Cody Koeninger wrote: > Solution 2 sounds better to me. You aren't always going to have graceful > shutdowns. > > On Mon, Sep 14, 2015 at 1:49 PM, Ofir Kerker > wrote: > >> Hi, >> My Spark Streaming application consumes messages (events) from Kafka every >> 10 seconds using the direct stream approach and aggregates these messages >> into hourly aggregations (to answer analytics questions like: "How many >> users from Paris visited page X between 8PM to 9PM") and save the data to >> Cassandra. >> >> I was wondering if there's a good practice for handling a code change in a >> Spark Streaming applications that uses stateful transformations >> (updateStateByKey for example) because the new application code will not >> be >> able to use the data that was checkpointed by the former application. >> I have thought of a few solutions for this issue and was hoping some of >> you >> have some experience with such case and can suggest other solutions or >> feedback my suggested solutions: >> *Solution #1*: On a graceful shutdown, in addition to the current Kafka >> offsets, persist the current aggregated data into Cassandra tables >> (different than the regular aggregation tables) that would allow reading >> them easily when the new application starts in order to build the initial >> state. >> *Solution #2*: When an hour is "complete" (i.e not expecting more events >> with the timestamp of this hour), update somewhere persistent (DB / shared >> file) the last-complete-hour. This will allow me, when the new application >> starts, to read all the events from Kafka from the beginning of retention >> period (last X hours) and ignore events from timestamp smaller or equal >> than >> the last-complete-hour. >> >> I'll be happy to get your feedback! >> >> Thanks, >> Ofir >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-application-code-change-and-stateful-transformations-tp24692.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Re: Spark Streaming application code change and stateful transformations
Solution 2 sounds better to me. You aren't always going to have graceful shutdowns. On Mon, Sep 14, 2015 at 1:49 PM, Ofir Kerker wrote: > Hi, > My Spark Streaming application consumes messages (events) from Kafka every > 10 seconds using the direct stream approach and aggregates these messages > into hourly aggregations (to answer analytics questions like: "How many > users from Paris visited page X between 8PM to 9PM") and save the data to > Cassandra. > > I was wondering if there's a good practice for handling a code change in a > Spark Streaming applications that uses stateful transformations > (updateStateByKey for example) because the new application code will not be > able to use the data that was checkpointed by the former application. > I have thought of a few solutions for this issue and was hoping some of you > have some experience with such case and can suggest other solutions or > feedback my suggested solutions: > *Solution #1*: On a graceful shutdown, in addition to the current Kafka > offsets, persist the current aggregated data into Cassandra tables > (different than the regular aggregation tables) that would allow reading > them easily when the new application starts in order to build the initial > state. > *Solution #2*: When an hour is "complete" (i.e not expecting more events > with the timestamp of this hour), update somewhere persistent (DB / shared > file) the last-complete-hour. This will allow me, when the new application > starts, to read all the events from Kafka from the beginning of retention > period (last X hours) and ignore events from timestamp smaller or equal > than > the last-complete-hour. > > I'll be happy to get your feedback! > > Thanks, > Ofir > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-application-code-change-and-stateful-transformations-tp24692.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >