Thanks Cody again for your answer. The idea here is to process all events but only launch the big job (that is longer than the batch size) if they are the last events for an id considering the current state of data. Knowing if they are the last is my issue in fact.
So I think I need two jobs. One to get a view of theses last events by id. And one that compare the events in the DStream to the view of last events and launch the big job only if needed. It is some kind of a trick to not accumulate latency. I know the main issue come from the fact that my job is too long, but it could be useful to have this feature to get the time to handle the main issue that will be more time consuming (changes in architecture are probably needed). Le mer. 6 janv. 2016 à 18:06, Cody Koeninger <c...@koeninger.org> a écrit : > If your job consistently takes longer than the batch time to process, you > will keep lagging longer and longer behind. That's not sustainable, you > need to increase batch sizes or decrease processing time. In your case, > probably increase batch size, since you're pre-filtering it down to only 1 > event per user. > > There's no way around this, you can't just magically ignore some time > range of rdds, because they may contain events you care about. > > On Wed, Jan 6, 2016 at 10:55 AM, Julien Naour <julna...@gmail.com> wrote: > >> The following lines are my understanding of Spark Streaming AFAIK, I >> could be wrong: >> >> Spark Streaming processes data from a Stream in micro-batch, one at a >> time. >> When a process takes time, DStream's RDD are accumulated. >> So in my case (my process takes time) DStream's RDD are accumulated. What >> I want to do is to skip all intermediate events in this queue of RDD. >> >> I'm not sure to be right but with your solution I'll get the last events >> by id for each RDD in the DStream. What I want is to process the following >> event that is the last events for an id for the DStream's RDD that are >> accumulated. >> >> Regards, >> >> Julien >> >> Le mer. 6 janv. 2016 à 17:35, Cody Koeninger <c...@koeninger.org> a >> écrit : >> >>> if you don't have hot users, you can use the user id as the hash key for >>> publishing into kafka. >>> That will put all events for a given user in the same partition per >>> batch. >>> Then you can do foreachPartition with a local map to store just a single >>> event per user, e.g. >>> >>> foreachPartition { p => >>> val m = new HashMap >>> p.foreach ( event => >>> m.put(event,user, event) >>> } >>> m.foreach { >>> ... do your computation >>> } >>> >>> On Wed, Jan 6, 2016 at 10:09 AM, Julien Naour <julna...@gmail.com> >>> wrote: >>> >>>> Thanks for your answer, >>>> >>>> As I understand it, a consumer that stays caught-up will read every >>>> message even with compaction. So for a pure Kafka Spark Streaming It will >>>> not be a solution. >>>> >>>> Perhaps I could reconnect to the Kafka topic after each process to get >>>> the last state of events and then compare to current Kafka Spark Streaming >>>> events, but it seems a little tricky. For each event it will connect to >>>> Kafka and get the current state by key (possibly lot of data) and then >>>> compare to the current event. Latency could be an issue then. >>>> >>>> To be more specific with my issue: >>>> >>>> My events have specific keys corresponding to some kind of user id. I >>>> want to process last events by each user id once ie skip intermediate >>>> events by user id. >>>> I have only one Kafka topic with all theses events. >>>> >>>> Regards, >>>> >>>> Julien Naour >>>> >>>> Le mer. 6 janv. 2016 à 16:13, Cody Koeninger <c...@koeninger.org> a >>>> écrit : >>>> >>>>> Have you read >>>>> >>>>> http://kafka.apache.org/documentation.html#compaction >>>>> >>>>> >>>>> >>>>> On Wed, Jan 6, 2016 at 8:52 AM, Julien Naour <julna...@gmail.com> >>>>> wrote: >>>>> >>>>>> Context: Process data coming from Kafka and send back results to >>>>>> Kafka. >>>>>> >>>>>> Issue: Each events could take several seconds to process (Work in >>>>>> progress to improve that). During that time, events (and RDD) do >>>>>> accumulate. Intermediate events (by key) do not have to be processed, >>>>>> only >>>>>> the last ones. So when one process finished it would be ideal that Spark >>>>>> Streaming skip all events that are not the current last ones (by key). >>>>>> >>>>>> I'm not sure that the solution could be done using only Spark >>>>>> Streaming API. As I understand Spark Streaming, DStream RDD will >>>>>> accumulate >>>>>> and be processed one by one and do not considerate if there are others >>>>>> afterwards. >>>>>> >>>>>> Possible solutions: >>>>>> >>>>>> Using only Spark Streaming API but I'm not sure how. updateStateByKey >>>>>> seems to be a solution. But I'm not sure that it will work properly when >>>>>> DStream RDD accumulate and you have to only process lasts events by key. >>>>>> >>>>>> Have two Spark Streaming pipelines. One to get last updated event by >>>>>> key, store that in a map or a database. The second pipeline processes >>>>>> events only if they are the last ones as indicate by the other pipeline. >>>>>> >>>>>> >>>>>> Sub questions for the second solution: >>>>>> >>>>>> Could two pipelines share the same sparkStreamingContext and process >>>>>> the same DStream at different speed (low processing vs high)? >>>>>> >>>>>> Is it easily possible to share values (map for example) between >>>>>> pipelines without using an external database? I think >>>>>> accumulator/broadcast >>>>>> could work but between two pipelines I'm not sure. >>>>>> >>>>>> Regards, >>>>>> >>>>>> Julien Naour >>>>>> >>>>> >>>>> >>> >