Thanks!

On Mon, 18 Feb 2019 at 12:36, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Stephen,
>
> Sorry for the late response.
> If you don't need to match open and close events, your approach of using a
> flatMap to fan-out for the hierarchical folder structure and a window
> operator (or two for open and close) for counting and aggregating should be
> a good design.
>
> Best, Fabian
>
> Am Mo., 11. Feb. 2019 um 11:29 Uhr schrieb Stephen Connolly <
> stephen.alan.conno...@gmail.com>:
>
>>
>>
>> On Mon, 11 Feb 2019 at 09:42, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Stephen,
>>>
>>> A window is created with the first record that is assigned to it.
>>> If the windows are based on time and a key, than no window will be
>>> created (and not space be occupied) if there is not a first record for a
>>> key and time interval.
>>>
>>> Anyway, if tracking the number of open files & average opening time is
>>> your use case, you might want to implement the logic with a ProcessFunction
>>> instead of a window.
>>> The reason is that it is that time windows don't share state, i.e., the
>>> information about an opened but not yet closed file would not be "carried
>>> over" to the next window.
>>> However, if you use a ProcessFunction, you are responsible for cleaning
>>> up the state.
>>>
>>
>> Ahh but I am cheating by ensuring the events are rich enough that I do
>> not need to match them.
>>
>> I get the "open" (they are not really "open" events - I have mapped to an
>> analogy... it might be more like a build job start events... or not... I'm
>> not at liberty to say ;-) ) events because I need to count the number of
>> "open"s per time period.
>>
>> I get the "close" events and they include the duration plus other
>> information that can then be transformed into the required metrics... yes I
>> could derive the "open" from the "close" by subtracting the duration but:
>>
>> 1. they would cross window boundaries quite often, leading to repeated
>> fetch-update-write operations on the backing data store
>> 2. they wouldn't be as "live" and one of the things we need to know is
>> how many "open"s there are in the previous window... given some durations
>> can be many days, waiting for the "close" event to create the "open" metric
>> would not be a good plan.
>>
>> Basically, I am pushing some of the calculations to the edge where there
>> is state that makes those calculations cheap and then the rich events are
>> *hopefully* easy to aggregate with just simple aggregation functions that
>> only need to maintain the running total... at least that's what the PoC I
>> am experimenting with Flink should show
>>
>>
>>>
>>> Hope this helps,
>>> Fabian
>>>
>>> Am So., 10. Feb. 2019 um 20:36 Uhr schrieb Stephen Connolly <
>>> stephen.alan.conno...@gmail.com>:
>>>
>>>>
>>>>
>>>> On Sun, 10 Feb 2019 at 09:09, Chesnay Schepler <ches...@apache.org>
>>>> wrote:
>>>>
>>>>> This sounds reasonable to me.
>>>>>
>>>>> I'm a bit confused by this question: "*Additionally, I am (naïevely)
>>>>> hoping that if a window has no events for a particular key, the
>>>>> memory/storage costs are zero for that key.*"
>>>>>
>>>>> Are you asking whether a key that was received in window X (as part of
>>>>> an event) is still present in window x+1? If so, then the answer is no; a
>>>>> key will only be present in a given window if an event was received that
>>>>> fits into that window.
>>>>>
>>>>
>>>> To confirm:
>>>>
>>>> So let's say I'l tracking the average time a file is opened in folders.
>>>>
>>>> In window N we get the events:
>>>>
>>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
>>>>
>>>> {"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}
>>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User
>>>> guide.txt"}
>>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin
>>>> guide.txt"}
>>>>
>>>> So there will be aggregates stored for
>>>> ("ca:fe:ba:be","/"), ("ca:fe:ba:be","/foo"), ("ca:fe:ba:be","/foo/bar"),
>>>> ("ca:fe:ba:be","/foo/bar/README.txt"), etc
>>>>
>>>> In window N+1 we do not get any events at all.
>>>>
>>>> So the memory used by my aggregation functions from window N will be
>>>> freed and the storage will be effectively zero (modulo any follow on
>>>> processing that might be on a longer window)
>>>>
>>>> This seems to be what you are saying... in which case my naïeve hope
>>>> was not so naïve! w00t!
>>>>
>>>>
>>>>>
>>>>> On 08.02.2019 13:21, Stephen Connolly wrote:
>>>>>
>>>>> Ok, I'll try and map my problem into something that should be familiar
>>>>> to most people.
>>>>>
>>>>> Consider collection of PCs, each of which has a unique ID, e.g.
>>>>> ca:fe:ba:be, de:ad:be:ef, etc.
>>>>>
>>>>> Each PC has a tree of local files. Some of the file paths are
>>>>> coincidentally the same names, but there is no file sharing between PCs.
>>>>>
>>>>> I need to produce metrics about how often files are opened and how
>>>>> long they are open for.
>>>>>
>>>>> I need for every X minute tumbling window not just the cumulative
>>>>> averages for each PC, but the averages for each file as well as the
>>>>> cumulative averegaes for each folder and their sub-folders.
>>>>>
>>>>> I have a stream of events like
>>>>>
>>>>>
>>>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
>>>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User
>>>>> guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin
>>>>> guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User
>>>>> guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin
>>>>> guide.txt","duration":"196"}
>>>>>
>>>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"}
>>>>> {"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"}
>>>>>
>>>>> So from that I would like to know stuff like:
>>>>>
>>>>> ca:fe:ba:be had 4/X opens per minute in the X minute window
>>>>> ca:fe:ba:be had 3/X closes per minute in the X minute window and the
>>>>> average time open was (67+97+197)/3=120... there is no guarantee that the
>>>>> closes will be matched with opens in the same window, which is why I'm 
>>>>> only
>>>>> tracking them separately
>>>>> de:ad:be:ef had 2/X opens per minute in the X minute window
>>>>> ca:fe:ba:be /foo had 4/X opens per minute in the X minute window
>>>>> ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and
>>>>> the average time open was 120
>>>>> de:ad:be:ef /foo had 1/X opens per minute in the X minute window
>>>>> de:ad:be:ef /bar had 1/X opens per minute in the X minute window
>>>>> de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window
>>>>> de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window
>>>>> de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X
>>>>> minute window
>>>>> de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X
>>>>> minute window
>>>>> etc
>>>>>
>>>>> What I think I want to do is turn each event into a series of events
>>>>> with different keys, so that
>>>>>
>>>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
>>>>>
>>>>> gets sent under the keys:
>>>>>
>>>>> ("ca:fe:ba:be","/")
>>>>> ("ca:fe:ba:be","/foo")
>>>>> ("ca:fe:ba:be","/foo/bar")
>>>>> ("ca:fe:ba:be","/foo/bar/README.txt")
>>>>>
>>>>> Then I could use a window aggregation function to just:
>>>>>
>>>>> * count the "open" events
>>>>> * count the "close" events and sum their duration
>>>>>
>>>>> Additionally, I am (naïevely) hoping that if a window has no events
>>>>> for a particular key, the memory/storage costs are zero for that key.
>>>>>
>>>>> From what I can see, to achieve what I am trying to do, I could use a
>>>>> flatMap followed by a keyBy
>>>>>
>>>>> In other words I take the events and flat map them based on the path
>>>>> split on '/' returning a Tuple of the (to be) key and the event. Then I 
>>>>> can
>>>>> use keyBy to key based on the Tuple 0.
>>>>>
>>>>> My ask:
>>>>>
>>>>> Is the above design a good design? How would you achieve the end game
>>>>> better? Do I need to worry about many paths that are accessed rarely and
>>>>> would have an accumulator function that stays at 0 unless there are events
>>>>> in that window... or are the accumulators for each distinct key eagerly
>>>>> purged after each fire trigger.
>>>>>
>>>>> What gotcha's do I need to look for.
>>>>>
>>>>> Thanks in advance and appologies for the length
>>>>>
>>>>> -stephenc
>>>>>
>>>>>
>>>>>

Reply via email to