Hey Aljoscha,

that sounds very promising, awesome! Though, I still would need to implement my 
own window management logic (window assignment and window state purging), 
right? I was thinking about reusing some of the existing components 
(TimeWindow) and WindowAssigner, but run my own WindowOperator (aka 
ProcessFunction). But I am not sure, if that is done easily. I would love to 
hear your opinion on that, and what the tricky parts will be? For example, 
common mistakes you experienced in developing the windowing mechanism.

best Stephan


> On 14 Nov 2016, at 19:05, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> Hi Stephan,
> I was going to suggest that using a flatMap and tracking the timestamp of 
> each key yourself is a bit like having a per-key watermark. I wanted to wait 
> a bit before answering because I'm currently working on a new type of 
> Function that will be release with Flink 1.2: ProcessFunction. This is 
> somewhat like a FlatMap but also allows to access the element timestamp, 
> query current processing time/event time and set (per key) timers for 
> processing time and event time. With this you should be able to easily 
> implement your per-key tracking, I hope.
> 
> Cheers,
> Aljoscha
> 
> P.S. ProcessFunction is already in the Flink repository but it's called 
> TimelyFlatMapFunction right now, because I was working on it under that 
> working title.
> 
> On Mon, 14 Nov 2016 at 15:47 kaelumania <stephan.epp...@zweitag.de 
> <mailto:stephan.epp...@zweitag.de>> wrote:
> Hey Fabian,
> 
> thank you very much. 
> 
> - yes, I would window by event time and fire/purge by processing time
> - Cheaper in the end meant, that having too much state in the flink cluster 
> would be more expensive, as we store all data in cassandra too.I think the 
> fault tolerance would be okay, as we would make a compare and set with 
> cassandra. 
> 
> With the flatMap Operator wouldn’t it be like running my own windowing 
> mechanism? I need to keep the aggregate window per sensor open (with 
> checkpointing and state management) until I receive an element for a sensor 
> that is later in time than the windows time and then purge the state and emit 
> a new event (which is like having a watermark per sensor). Further, I need a 
> timer that fires like after 24 hours, in case a sensor dies and doesn’t send 
> more data which might is possible with window assigner/trigger, right? But 
> not inside normal functions, e.g. flatMap? We can guarantee that all sensor 
> data per sensor comes almost in order (might be out of order within a few 
> seconds), but there might be gaps of several hours after network partitions.
> 
> There is now way to define/redefine the watermark per keyed stream? Or adjust 
> the window assigner + trigger to achieve the desired behaviour? I am a bit 
> reserved in implementing the whole state management. Do you plan to support 
> such use cases on keyed streams? Maybe the WatermarkAssigner could also 
> receive information about the key for wich the watermark should be calculated 
> etc.
> 
> best, Stephan
> 
> 
> 
>> On 14 Nov 2016, at 15:17, Fabian Hueske-2 [via Apache Flink User Mailing 
>> List archive.] <[hidden email] 
>> <http://user/SendEmail.jtp?type=node&node=10098&i=0>> wrote:
>> 
> 
>> Hi Stephan,
>> 
>> I'm skeptical about two things: 
>> - using processing time will result in inaccurately bounded aggregates (or 
>> do you want to group by event time in a processing time window?)
>> - writing to and reading from Cassandra might be expensive (not sure what 
>> you mean by cheaper in the end) and it is not integrated with Flink's 
>> checkpointing mechanism for fault-tolerance.
>> 
>> To me, the stateful FlatMapOperator looks like the best approach. There is 
>> an upcoming feature for registering timers in user-functions, i.e., a 
>> function is called after the timer exceeds. This could be helpful to 
>> overcome the problem of closing the window without new data.
>> 
>> Best, 
>> Fabian
> 
>> 
>> 2016-11-14 8:39 GMT+01:00 Stephan Epping <<a 
>> href="x-msg://10/user/SendEmail.jtp?type=node&amp;node=10094&amp;i=0 
>> <x-msg://10/user/SendEmail.jtp?type=node&amp;node=10094&amp;i=0>" 
>> target="_top" rel="nofollow" link="external" class="">[hidden email]>:
> 
>> Hello Fabian,
>> 
>> Thank you very much. What is your opinion on the following solution:
>> 
>> - Window data per time window, e.g. 15 minutes
>> - using processing time as trigger, e.g. 15 minutes
>> - which results in an aggregate over sensor values
>> - then use cassandra to select the previous aggregate (as there can be 
>> multiple for the time window due to processing time)
>> - then update the aggregate and put it into a cassandra sink again
>> 
>> The cassandra select will be a bit slower than using an in memory/flink 
>> state, but will be cheaper in the end. Further, what does this have for 
>> consequences?
>> For example, replaying events will be more difficult, right? Also, what 
>> about Snapshots? Will they work with the mentioned design?
>> 
>> kind regards,
>> Stephan
> 
>>> On 11 Nov 2016, at 00:39, Fabian Hueske <<a 
>>> href="x-msg://10/user/SendEmail.jtp?type=node&amp;node=10094&amp;i=1 
>>> <x-msg://10/user/SendEmail.jtp?type=node&amp;node=10094&amp;i=1>" 
>>> target="_top" rel="nofollow" link="external" class="">[hidden email]> wrote:
>>> 
> 
>>> Hi Stephan,
>>> 
>>> I just wrote an answer to your SO question. 
>>> 
>>> Best, Fabian
> 
>>> 
>>> 2016-11-10 11:01 GMT+01:00 Stephan Epping <<a 
>>> href="x-msg://10/user/SendEmail.jtp?type=node&amp;node=10094&amp;i=2 
>>> <x-msg://10/user/SendEmail.jtp?type=node&amp;node=10094&amp;i=2>" 
>>> target="_top" rel="nofollow" link="external" class="">[hidden email]>:
> 
>>> 
>>> Hello,
>>> 
>>> I found this question in the Nabble archive 
>>> (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html
>>>  
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html>)
>>>  but was unable/dont know how to reply.
>>> 
>>> Here is my question regarding the mentioned thread:
>>> 
>>>> Hello, 
>>>> 
>>>> I have similar requirements (see StackOverflor 
>>>> http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data
>>>>  
>>>> <http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data>).
>>>>  I am pretty new to flink, could you elaborate on a possible solution? We 
>>>> can guarantee good ordering by sensor_id, thus watermarking by key would 
>>>> be the only reasonable way for us 
>>>> (sensorData.keyBy('id').timeWindow(1.minute).sum('value')), could I do my 
>>>> own watermarking aftersensorData.keyBy('id').overwriteWatermarking()... 
>>>> per key? Or maybe using custom state plus a custom trigger? What happens 
>>>> if a sensor dies or is being removed completely, how can this be detected 
>>>> as watermarks would be ignored for window garbage collection. Or could we 
>>>> dynamically schedule a job of each sensor? Which would result in 1000 Jobs.
>>> 
>>> 
>>> Thanks,
>>> Stephan
>>> 
>>> 
> 
>>> 
>> 
>> 
>> 
>> 
>> If you reply to this email, your message will be added to the discussion 
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10094.html
>>  
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10094.html>
>> To unsubscribe from Maintaining watermarks per key, instead of per operator 
>> instance, click here <>.
>> NAML 
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
> 
> View this message in context: Re: Maintaining watermarks per key, instead of 
> per operator instance 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10098.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at 
> Nabble.com <http://nabble.com/>.

Reply via email to