Thanks! On Mon, 18 Feb 2019 at 12:36, Fabian Hueske <fhue...@gmail.com> wrote:
> Hi Stephen, > > Sorry for the late response. > If you don't need to match open and close events, your approach of using a > flatMap to fan-out for the hierarchical folder structure and a window > operator (or two for open and close) for counting and aggregating should be > a good design. > > Best, Fabian > > Am Mo., 11. Feb. 2019 um 11:29 Uhr schrieb Stephen Connolly < > stephen.alan.conno...@gmail.com>: > >> >> >> On Mon, 11 Feb 2019 at 09:42, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi Stephen, >>> >>> A window is created with the first record that is assigned to it. >>> If the windows are based on time and a key, than no window will be >>> created (and not space be occupied) if there is not a first record for a >>> key and time interval. >>> >>> Anyway, if tracking the number of open files & average opening time is >>> your use case, you might want to implement the logic with a ProcessFunction >>> instead of a window. >>> The reason is that it is that time windows don't share state, i.e., the >>> information about an opened but not yet closed file would not be "carried >>> over" to the next window. >>> However, if you use a ProcessFunction, you are responsible for cleaning >>> up the state. >>> >> >> Ahh but I am cheating by ensuring the events are rich enough that I do >> not need to match them. >> >> I get the "open" (they are not really "open" events - I have mapped to an >> analogy... it might be more like a build job start events... or not... I'm >> not at liberty to say ;-) ) events because I need to count the number of >> "open"s per time period. >> >> I get the "close" events and they include the duration plus other >> information that can then be transformed into the required metrics... yes I >> could derive the "open" from the "close" by subtracting the duration but: >> >> 1. they would cross window boundaries quite often, leading to repeated >> fetch-update-write operations on the backing data store >> 2. they wouldn't be as "live" and one of the things we need to know is >> how many "open"s there are in the previous window... given some durations >> can be many days, waiting for the "close" event to create the "open" metric >> would not be a good plan. >> >> Basically, I am pushing some of the calculations to the edge where there >> is state that makes those calculations cheap and then the rich events are >> *hopefully* easy to aggregate with just simple aggregation functions that >> only need to maintain the running total... at least that's what the PoC I >> am experimenting with Flink should show >> >> >>> >>> Hope this helps, >>> Fabian >>> >>> Am So., 10. Feb. 2019 um 20:36 Uhr schrieb Stephen Connolly < >>> stephen.alan.conno...@gmail.com>: >>> >>>> >>>> >>>> On Sun, 10 Feb 2019 at 09:09, Chesnay Schepler <ches...@apache.org> >>>> wrote: >>>> >>>>> This sounds reasonable to me. >>>>> >>>>> I'm a bit confused by this question: "*Additionally, I am (naïevely) >>>>> hoping that if a window has no events for a particular key, the >>>>> memory/storage costs are zero for that key.*" >>>>> >>>>> Are you asking whether a key that was received in window X (as part of >>>>> an event) is still present in window x+1? If so, then the answer is no; a >>>>> key will only be present in a given window if an event was received that >>>>> fits into that window. >>>>> >>>> >>>> To confirm: >>>> >>>> So let's say I'l tracking the average time a file is opened in folders. >>>> >>>> In window N we get the events: >>>> >>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"} >>>> >>>> {"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"} >>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User >>>> guide.txt"} >>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin >>>> guide.txt"} >>>> >>>> So there will be aggregates stored for >>>> ("ca:fe:ba:be","/"), ("ca:fe:ba:be","/foo"), ("ca:fe:ba:be","/foo/bar"), >>>> ("ca:fe:ba:be","/foo/bar/README.txt"), etc >>>> >>>> In window N+1 we do not get any events at all. >>>> >>>> So the memory used by my aggregation functions from window N will be >>>> freed and the storage will be effectively zero (modulo any follow on >>>> processing that might be on a longer window) >>>> >>>> This seems to be what you are saying... in which case my naïeve hope >>>> was not so naïve! w00t! >>>> >>>> >>>>> >>>>> On 08.02.2019 13:21, Stephen Connolly wrote: >>>>> >>>>> Ok, I'll try and map my problem into something that should be familiar >>>>> to most people. >>>>> >>>>> Consider collection of PCs, each of which has a unique ID, e.g. >>>>> ca:fe:ba:be, de:ad:be:ef, etc. >>>>> >>>>> Each PC has a tree of local files. Some of the file paths are >>>>> coincidentally the same names, but there is no file sharing between PCs. >>>>> >>>>> I need to produce metrics about how often files are opened and how >>>>> long they are open for. >>>>> >>>>> I need for every X minute tumbling window not just the cumulative >>>>> averages for each PC, but the averages for each file as well as the >>>>> cumulative averegaes for each folder and their sub-folders. >>>>> >>>>> I have a stream of events like >>>>> >>>>> >>>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"} >>>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User >>>>> guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin >>>>> guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User >>>>> guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin >>>>> guide.txt","duration":"196"} >>>>> >>>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"} >>>>> {"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"} >>>>> >>>>> So from that I would like to know stuff like: >>>>> >>>>> ca:fe:ba:be had 4/X opens per minute in the X minute window >>>>> ca:fe:ba:be had 3/X closes per minute in the X minute window and the >>>>> average time open was (67+97+197)/3=120... there is no guarantee that the >>>>> closes will be matched with opens in the same window, which is why I'm >>>>> only >>>>> tracking them separately >>>>> de:ad:be:ef had 2/X opens per minute in the X minute window >>>>> ca:fe:ba:be /foo had 4/X opens per minute in the X minute window >>>>> ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and >>>>> the average time open was 120 >>>>> de:ad:be:ef /foo had 1/X opens per minute in the X minute window >>>>> de:ad:be:ef /bar had 1/X opens per minute in the X minute window >>>>> de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window >>>>> de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window >>>>> de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X >>>>> minute window >>>>> de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X >>>>> minute window >>>>> etc >>>>> >>>>> What I think I want to do is turn each event into a series of events >>>>> with different keys, so that >>>>> >>>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"} >>>>> >>>>> gets sent under the keys: >>>>> >>>>> ("ca:fe:ba:be","/") >>>>> ("ca:fe:ba:be","/foo") >>>>> ("ca:fe:ba:be","/foo/bar") >>>>> ("ca:fe:ba:be","/foo/bar/README.txt") >>>>> >>>>> Then I could use a window aggregation function to just: >>>>> >>>>> * count the "open" events >>>>> * count the "close" events and sum their duration >>>>> >>>>> Additionally, I am (naïevely) hoping that if a window has no events >>>>> for a particular key, the memory/storage costs are zero for that key. >>>>> >>>>> From what I can see, to achieve what I am trying to do, I could use a >>>>> flatMap followed by a keyBy >>>>> >>>>> In other words I take the events and flat map them based on the path >>>>> split on '/' returning a Tuple of the (to be) key and the event. Then I >>>>> can >>>>> use keyBy to key based on the Tuple 0. >>>>> >>>>> My ask: >>>>> >>>>> Is the above design a good design? How would you achieve the end game >>>>> better? Do I need to worry about many paths that are accessed rarely and >>>>> would have an accumulator function that stays at 0 unless there are events >>>>> in that window... or are the accumulators for each distinct key eagerly >>>>> purged after each fire trigger. >>>>> >>>>> What gotcha's do I need to look for. >>>>> >>>>> Thanks in advance and appologies for the length >>>>> >>>>> -stephenc >>>>> >>>>> >>>>>