Just for clarification: The real-time results should also contain the visitId, correct?
On Tue, Dec 1, 2015 at 12:06 PM, Stephan Ewen <se...@apache.org> wrote: > Hi Niels! > > If you want to use the built-in windowing, you probably need two window: > - One for ID assignment (that immediately pipes elements through) > - One for accumulating session elements, and then piping them into files > upon session end. > > You may be able to use the rolling file sink (roll by 15 minutes) to store > the files. > That is probably the simplest to implement and will serve the real time > case. > > > +--> (real time sink) > | > (source) --> (window session ids) --+ > | > +--> (window session) --> (rolling > sink) > > > You can put this all into one operator that accumulates the session > elements but still immediately emits the new records (the realtime path), > if you implement your own windowing/buffering in a custom function. > This is also very easy to put onto event time then, which makes it > valueable to process the history (replay). For this second case, still > prototyping some code for the event time case, give me a bit, I'll get back > at you... > > Greetings, > Stephan > > > On Tue, Dec 1, 2015 at 10:55 AM, Niels Basjes <ni...@basjes.nl> wrote: > >> Hi Stephan, >> >> I created a first version of the Visit ID assignment like this: >> >> First I group by sessionid and I create a Window per visit. >> The custom Trigger for this window does a 'FIRE' after each element and >> sets an EventTimer on the 'next possible moment the visit can expire'. >> To avoid getting 'all events' in the visit after every 'FIRE' I'm using >> CountEvictor.of(1). >> When the visit expires I do a PURGE. So if there are more events >> afterwards for the same sessionId I get a new visit (which is exactly what >> I want). >> >> The last step I do is I want to have a 'normal' DataStream again to work >> with. >> I created this WindowFunction to map the Window stream back to normal >> DataStream >> Essentially I do this: >> >> DataStream<Foo> visitDataStream = visitWindowedStream.apply(new >> WindowToStream<Foo>()) >> >> // This is an identity 'apply' >> private static class WindowToStream<T> implements WindowFunction<T, T, >> String, GlobalWindow> { >> @Override >> public void apply(String s, GlobalWindow window, Iterable<T> values, >> Collector<T> out) throws Exception { >> for (T value: values) { >> out.collect(value); >> } >> } >> } >> >> >> The problem with this is that I first create the visitIds in a Window >> (great). >> Because I really need to have both the Windowed events AND the near >> realtime version I currently break down the Window to get the single events >> and after that I have to recreate the same Window again. >> >> I'm looking forward to the implementation direction you are referring to. >> I hope you have a better way of doing this. >> >> Niels Basjes >> >> >> On Mon, Nov 30, 2015 at 9:29 PM, Stephan Ewen <se...@apache.org> wrote: >> >>> Hi Niels! >>> >>> Nice use case that you have! >>> I think you can solve this super nicely with Flink, such that "replay" >>> and "realtime" are literally the same program - they differ only in whether >>> >>> Event time is, like you said, the key thing for "replay". Event time >>> depends on the progress in the timestamps of the data, so it can progress >>> at different speeds, depending on what the rate of your stream is. >>> With the appropriate data source, it will progress very fast in "replay >>> mode", so that you replay in "fast forward speed", and it progresses at the >>> same speed as processing time when you attach to the end of the Kafka queue. >>> >>> When you define the time intervals in your program to react to event >>> time progress, then you will compute the right sessionization in both >>> replay and real time settings. >>> >>> I am writing a little example code to share. The type of ID-assignment >>> sessions you want to do need an undocumented API right now, so I'll prepare >>> something there for you... >>> >>> Greetings, >>> Stephan >>> >>> >>> >>> On Sun, Nov 29, 2015 at 4:04 PM, Niels Basjes <ni...@basjes.nl> wrote: >>> >>>> Hi, >>>> >>>> The sessionid is present in the measurements. It can also be seen as a >>>> form of 'browser id'. >>>> Most websites use either a 'long lived random value in a cookie' or a >>>> 'application session id' for this. >>>> >>>> So with the id of the browser in hand I have the need to group all >>>> events into "periods of activity" which I call a visit. >>>> Such a visit is a bounded subset of all events from a single browser. >>>> >>>> What I need is to add a (sort of) random visit id to the events that >>>> becomes 'inactive' after more than X minutes of inactivity. >>>> I then want to add this visitid to each event and >>>> 1) stream them out in realtime >>>> 2) Wait till the visit ends and store the complete visit on disk (I am >>>> going for either AVRO or Parquet). >>>> >>>> I want to create diskfiles with all visits that ended in a specific >>>> time period. So essentially >>>> "Group by round(<timestamp of last event>, 15 minutes)" >>>> >>>> >>>> Because of the need to be able to 'repair' things I came with the >>>> following question: >>>> In the Flink API I see the 'process time' (i.e. the actual time of the >>>> server) and the 'event time' (i.e. the time when and event was recorded). >>>> >>>> Now in my case all events are in Kafka (for say 2 weeks). >>>> When something goes wrong I want to be able to 'reprocess' everything >>>> from the start of the queue. >>>> Here the matter of 'event time' becomes a big question for me; In those >>>> 'replay' situations the event time will progress at a much higher speed >>>> than the normal 1sec/sec. >>>> >>>> How does this work in Apache Flink? >>>> >>>> >>>> Niels Basjes >>>> >>>> >>>> On Fri, Nov 27, 2015 at 3:28 PM, Stephan Ewen <se...@apache.org> wrote: >>>> >>>>> Hey Niels! >>>>> >>>>> You may be able to implement this in windows anyways, depending on >>>>> your setup. You can definitely implement state with timeout yourself >>>>> (using >>>>> the more low-level state interface), or you may be able to use custom >>>>> windows for that (they can trigger on every element and return elements >>>>> immediately, thereby giving you low latency). >>>>> >>>>> Can you tell me where exactly the session ID comes from? Is that >>>>> something that the function with state generates itself? >>>>> Depending on that answer, I can outline either the window, or the >>>>> custom state way... >>>>> >>>>> Greetings, >>>>> Stephan >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, Nov 27, 2015 at 2:19 PM, Niels Basjes <ni...@basjes.nl> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Thanks for the explanation. >>>>>> I have clickstream data arriving in realtime and I need to assign the >>>>>> visitId and stream it out again (with the visitId now begin part of the >>>>>> record) into Kafka with the lowest possible latency. >>>>>> Although the Window feature allows me to group and close the visit on >>>>>> a timeout/expire (as shown to me by Aljoscha in a separate email) it does >>>>>> make a 'window'. >>>>>> >>>>>> So (as requested) I created a ticket for such a feature: >>>>>> https://issues.apache.org/jira/browse/FLINK-3089 >>>>>> >>>>>> Niels >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Nov 27, 2015 at 11:51 AM, Stephan Ewen <se...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hi Niels! >>>>>>> >>>>>>> Currently, state is released by setting the value for the key to >>>>>>> null. If you are tracking web sessions, you can try and send a "end of >>>>>>> session" element that sets the value to null. >>>>>>> >>>>>>> To be on the safe side, you probably want state that is >>>>>>> automatically purged after a while. I would look into using Windows for >>>>>>> that. The triggers there are flexible so you can schedule both actions >>>>>>> on >>>>>>> elements plus cleanup after a certain time delay (clock time or event >>>>>>> time). >>>>>>> >>>>>>> The question about "state expiry" has come a few times. People seem >>>>>>> to like working on state directly, but it should clean up automatically. >>>>>>> >>>>>>> Can you see if your use case fits onto windows, otherwise open a >>>>>>> ticket for state expiry? >>>>>>> >>>>>>> Greetings, >>>>>>> Stephan >>>>>>> >>>>>>> >>>>>>> On Thu, Nov 26, 2015 at 10:42 PM, Niels Basjes <ni...@basjes.nl> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I'm working on a streaming application that ingests clickstream >>>>>>>> data. >>>>>>>> In a specific part of the flow I need to retain a little bit of >>>>>>>> state per visitor (i.e. keyBy(sessionid) ) >>>>>>>> >>>>>>>> So I'm using the Key/Value state interface (i.e. OperatorState< >>>>>>>> MyRecord>) in a map function. >>>>>>>> >>>>>>>> Now in my application I expect to get a huge number of sessions per >>>>>>>> day. >>>>>>>> Since these sessionids are 'random' and become unused after the >>>>>>>> visitor leaves the website over time the system will have seen >>>>>>>> millions of >>>>>>>> those sessionids. >>>>>>>> >>>>>>>> So I was wondering: how are these OperatorStates cleaned? >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best regards / Met vriendelijke groeten, >>>>>>>> >>>>>>>> Niels Basjes >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Best regards / Met vriendelijke groeten, >>>>>> >>>>>> Niels Basjes >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Best regards / Met vriendelijke groeten, >>>> >>>> Niels Basjes >>>> >>> >>> >> >> >> -- >> Best regards / Met vriendelijke groeten, >> >> Niels Basjes >> > >