Ok, I have a better idea of what you’re trying to do now.

I think the prob might be the map. The first time the function runs, 
currentValue will be None. Using map on None returns None.

Instead, try:

Some(currentValue.getOrElse(Seq.empty) ++ newValues)

I think that should give you the expected result.


From: Pierce Lamb 
<richard.pierce.l...@gmail.com<mailto:richard.pierce.l...@gmail.com>>
Date: Thursday, December 18, 2014 at 2:31 PM
To: Silvio Fiorito 
<silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Help with updateStateByKey

Hi Silvio,

This is a great suggestion (I wanted to get rid of groupByKey), I have been 
trying to implement it this morning, but having some trouble. I would love to 
see your code for the function that goes inside updateStateByKey

Here is my current code:

 def updateGroupByKey( newValues: Seq[(String, Long, Long)],
                          currentValue: Option[Seq[(String, Long, Long)]]
                          ): Option[Seq[(String, Long, Long)]] = {

      currentValue.map{ case (v) => v ++ newValues
      }
    }

    val grouped = ipTimeStamp.updateStateByKey(updateGroupByKey)


However, when I run it the grouped DStream doesn't get populated with anything. 
The issue is probably that currentValue is not actually an Option[Seq[triple]] 
but rather an Option[triple]. However if I change it to an Option[triple] then 
I have to also return an Option[triple] for updateStateByKey to compile, but I 
want that return value to be an Option[Seq[triple]] because ultimately i want 
the data to look like (IPaddress, Seq[(pageRequested, startTime, EndTime), 
(pageRequested, startTime, EndTime)...]) and have that Seq build over time

Am I thinking about this wrong?

Thank you

On Thu, Dec 18, 2014 at 6:05 AM, Silvio Fiorito 
<silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>> wrote:
Hi Pierce,

You shouldn’t have to use groupByKey because updateStateByKey will get a
Seq of all the values for that key already.

I used that for realtime sessionization as well. What I did was key my
incoming events, then send them to udpateStateByKey. The updateStateByKey
function then received a Seq of the events and the Option of the previous
state for that key. The sessionization code then did its thing to check if
the incoming events were part of the same session, based on a configured
timeout. If a session already was active (from the previous state) and it
hadn’t exceeded the timeout, it used that value. Otherwise it generated a
new session id. Then the return value for the updateStateByKey function
was a Tuple of session id and last timestamp.

Then I joined the DStream with the session ids, which were both keyed off
the same id and continued my processing. Your requirements may be
different, but that’s what worked well for me.

Another thing to consider is cleaning up old sessions by returning None in
the updateStateByKey function. This will help with long running apps and
minimize memory usage (and checkpoint size).

I was using something similar to the method above on a live production
stream with very little CPU and memory footprint, running for weeks at a
time, processing up to 15M events per day with fluctuating traffic.

Thanks,
Silvio



On 12/17/14, 10:07 PM, "Pierce Lamb" 
<richard.pierce.l...@gmail.com<mailto:richard.pierce.l...@gmail.com>> wrote:

>I am trying to run stateful Spark Streaming computations over (fake)
>apache web server logs read from Kafka. The goal is to "sessionize"
>the web traffic similar to this blog post:
>http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionizat
>ion-with-spark-streaming-and-apache-hadoop/
>
>The only difference is that I want to "sessionize" each page the IP
>hits, instead of the entire session. I was able to do this reading
>from a file of fake web traffic using Spark in batch mode, but now I
>want to do it in a streaming context.
>
>Log files are read from Kafka and parsed into K/V pairs of
>
>(String, (String, Long, Long)) or
>
>(IP, (requestPage, time, time))
>
>I then call "groupByKey()" on this K/V pair. In batch mode, this would
>produce a:
>
>(String, CollectionBuffer((String, Long, Long), ...) or
>
>(IP, CollectionBuffer((requestPage, time, time), ...)
>
>In a StreamingContext, it produces a:
>
>(String, ArrayBuffer((String, Long, Long), ...) like so:
>
>(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
>
>However, as the next microbatch (DStream) arrives, this information is
>discarded. Ultimately what I want is for that ArrayBuffer to fill up
>over time as a given IP continues to interact and to run some
>computations on its data to "sessionize" the page time. I believe the
>operator to make that happen is "updateStateByKey." I'm having some
>trouble with this operator (I'm new to both Spark & Scala); any help
>is appreciated.
>
>Thus far:
>
>    val grouped =
>ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)
>
>
>        def updateGroupByKey(
>                              a: Seq[(String, ArrayBuffer[(String,
>Long, Long)])],
>                              b: Option[(String, ArrayBuffer[(String,
>Long, Long)])]
>                              ): Option[(String, ArrayBuffer[(String,
>Long, Long)])] = {
>
>      }
>
>---------------------------------------------------------------------
>To unsubscribe, e-mail: 
>user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
>For additional commands, e-mail: 
>user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>
>

Reply via email to