Re: ProcessWindowFunction中使用per-window state
你好, 可以通过使用 globalState / windowState 获取之前的状态进行增量计算。 下面这个 demo 可以方便理解: public class ProcessWindowFunctionDemo { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 使用处理时间 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 并行度为1 env.setParallelism(1); // 设置数据源,一共三个元素 DataStream> dataStream = env.addSource(new SourceFunction>() { @Override public void run(SourceContext> ctx) throws Exception { int xxxNum = 0; int yyyNum = 0; for (int i = 1; i < Integer.MAX_VALUE; i++) { // 只有XXX和YYY两种name String name = (0 == i % 2) ? "XXX" : "YYY"; //更新aaa和bbb元素的总数 if (0 == i % 2) { xxxNum++; } else { yyyNum++; } // 使用当前时间作为时间戳 long timeStamp = System.currentTimeMillis(); // 将数据和时间戳打印出来,用来验证数据 System.out.println(String.format("source,%s, %s,XXX total : %d,YYY total : %d\n", name, time(timeStamp), xxxNum, yyyNum)); // 发射一个元素,并且戴上了时间戳 ctx.collectWithTimestamp(new Tuple2(name, 1), timeStamp); // 每发射一次就延时1秒 Thread.sleep(1000); } } @Override public void cancel() { } }); // 将数据用5秒的滚动窗口做划分,再用ProcessWindowFunction SingleOutputStreamOperator mainDataStream = dataStream // 以Tuple2的f0字段作为key,本例中实际上key只有aaa和bbb两种 .keyBy(value -> value.f0) // 5秒一次的滚动窗口 .timeWindow(Time.seconds(5)) // 统计每个key当前窗口内的元素数量,然后把key、数量、窗口起止时间整理成字符串发送给下游算子 .process(new ProcessWindowFunction, String, String, TimeWindow>() { // 自定义状态 private ValueState state; @Override public void open(Configuration parameters) throws Exception { // 初始化状态,name是myState state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", KeyCount.class)); } public void clear(Context context){ ValueState contextWindowValueState = context.windowState().getState(new ValueStateDescriptor<>("myWindowState", KeyCount.class)); contextWindowValueState.clear(); } @Override public void process(String s, Context context, Iterable> iterable, Collector collector) throws Exception { // 从backend取得当前单词的myState状态 KeyCount current = state.value(); // 如果myState还从未没有赋值过,就在此初始化 if (current == null) { current = new KeyCount(); current.key = s; current.count = 0; } int count = 0; // iterable可以访问该key当前窗口内的所有数据, // 这里简单处理,只统计了元素数量 for (Tuple2 tuple2 : iterable) { count++; } // 更新当前key的元素总数 current.count += count; // 更新状态到backend state.update(current); System.out.println("getRuntimeContext() == context :" + (getRuntimeContext() == context)); ValueState contextWindowValueState = context.windowState().getState(new ValueStateDescriptor<>("myWindowState", KeyCount.class)); ValueState contextGlobalValueState = context.globalState().getState(new ValueStateDescriptor<>("myGlobalState", KeyCount.class)); KeyCount windowValue = contextWindowValueState.value(); if (windowValue == null) { windowValue = new KeyCount(); windowValue.key = s; windowValue.count = 0; } windowValue.count += count; contextWindowValueState.update(windowValue); KeyCount globalValue = contextGlobalValueState.value(); if (globalValue == null) { glo
回复: ProcessWindowFunction Parallelism
Hi Patricia, You are using the .windowall API which generates a global window. This operation is inherently non-parallel since all elements have to pass through the same operator instance so it cannot be set to any parallelism larger than 1. Best, Zhanghao Chen 发件人: patricia lee 发送时间: 2023年9月26日 21:30 收件人: user@flink.apache.org 主题: ProcessWindowFunction Parallelism Hi, Are processwindowfunctions cannot have more than 1 parallelism? Whenever I set it to 2, I am receiving an error message, "The parallelism of non parallel operator must be 1." dataKafka = Kafkasource (datasource) .parallelism(2) .rebalance(); dataKafka.windowAll(GlobalWindows.create()).trigger(MyCountTrigger.of(100)) .process( new CustomTrigger()) .setParallelism(2) ---> error throws here .rebalance()
ProcessWindowFunction Parallelism
Hi, Are processwindowfunctions cannot have more than 1 parallelism? Whenever I set it to 2, I am receiving an error message, "The parallelism of non parallel operator must be 1." dataKafka = Kafkasource (datasource) .parallelism(2) .rebalance(); dataKafka.windowAll(GlobalWindows.create()).trigger(MyCountTrigger.of(100)) .process( new CustomTrigger()) .setParallelism(2) ---> error throws here .rebalance()
Re: How to sort Iterable in ProcessWindowFunction?
And the comparator function The order of the return 1,0,-1 is relevant . In this order -1,0,1 it will sort descending I discovered. public static class SortEventsHandlingTime implements Comparator> { // Let's compare 2 Tuple4 objects public int compare(Tuple4 o1, Tuple4 o2) { if (Long.parseLong(o1.getField(0).toString()) > Long.parseLong(o2.getField(0).toString())) { return 1; } else if (Long.parseLong(o1.getField(0).toString()) == Long.parseLong(o2.getField(0).toString())){ return 0; } else { return -1; } } } Op ma 7 mrt. 2022 om 03:05 schreef yidan zhao : > Collect the elements to a list, then sort, then collect out. > > HG 于2022年3月3日周四 22:13写道: > >> Hi, >> I have need to sort the input of the ProcesWindowFunction by one of the >> fields of the Tuple4 that is in the Iterator. >> >> Any advice as to what the best way is? >> >> static class MyProcessWindowFunction extends >> ProcessWindowFunction, String, String, >> TimeWindow> { >> @Override >> public void process(String key, Context context, >> Iterable> input, Collector out) >> { >> Long elapsed = 0L; >> Long pHandlingTime = 0L; >> Long totalElapsed = 0L >> >> System.out.println(input.getClass()); >> >> Iterator> etter = >> input.iterator(); >> *for (Tuple4 in: input){* >> transactionId = in.getField(2).toString(); >> elapsed = Long.parseLong(in.getField(1).toString()) >> - pHandlingTime; >> totalElapsed = totalElapsed + elapsed; >> pHandlingTime = Long.parseLong(in.getField(1).toString()) >> >> out.collect("Key : " + key + " Window : " + >> context.window() + " transactionId : " + transactionId + " elapsed : " + >> elapsed.toString() + " max handling time : " + h.toString() + " >> totalElapsed " + totalElapsed); >> } >> } >> } >> >> >> Op do 3 mrt. 2022 om 15:12 schreef HG : >> >>> Hi, >>> I have need to sort the input of the ProcesWindowFunction by one of the >>> fields of the Tuple4 that is in the Iterator. >>> >>> static class MyProcessWindowFunction extends >>> ProcessWindowFunction, String, String, >>> TimeWindow> { >>> @Override >>> public void process(String key, Context context, >>> Iterable> input, Collector out) >>> { >>> Long elapsed = 0L; >>> Long pHandlingTime = 0L; >>> Long totalElapsed = 0L >>> >>> System.out.println(input.getClass()); >>> >>> Iterator> etter = >>> input.iterator(); >>> *for (Tuple4 in: input){* >>> transactionId = in.getField(2).toString(); >>> elapsed = >>> Long.parseLong(in.getField(1).toString()) - pHandlingTime; >>> totalElapsed = totalElapsed + elapsed; >>> pHandlingTime = Long.parseLong(in.getField(1).toString()) >>> >>> out.collect("Key : " + key + " Window : " + >>> context.window() + " transactionId : " + transactionId + " elapsed : " + >>> elapsed.toString() + " max handling time : " + h.toString() + " >>> totalElapsed " + totalElapsed); >>> } >>> } >>> } >>> >>
Re: How to sort Iterable in ProcessWindowFunction?
For the record. So that other unexperienced people my benefit too List> inputList = new ArrayList<>(); input.forEach(inputList::add); inputList.sort(new SortEventsHandlingTime()); for (Tuple4 in: inputList){ Op ma 7 mrt. 2022 om 03:05 schreef yidan zhao : > Collect the elements to a list, then sort, then collect out. > > HG 于2022年3月3日周四 22:13写道: > >> Hi, >> I have need to sort the input of the ProcesWindowFunction by one of the >> fields of the Tuple4 that is in the Iterator. >> >> Any advice as to what the best way is? >> >> static class MyProcessWindowFunction extends >> ProcessWindowFunction, String, String, >> TimeWindow> { >> @Override >> public void process(String key, Context context, >> Iterable> input, Collector out) >> { >> Long elapsed = 0L; >> Long pHandlingTime = 0L; >> Long totalElapsed = 0L >> >> System.out.println(input.getClass()); >> >> Iterator> etter = >> input.iterator(); >> *for (Tuple4 in: input){* >> transactionId = in.getField(2).toString(); >> elapsed = Long.parseLong(in.getField(1).toString()) >> - pHandlingTime; >> totalElapsed = totalElapsed + elapsed; >> pHandlingTime = Long.parseLong(in.getField(1).toString()) >> >> out.collect("Key : " + key + " Window : " + >> context.window() + " transactionId : " + transactionId + " elapsed : " + >> elapsed.toString() + " max handling time : " + h.toString() + " >> totalElapsed " + totalElapsed); >> } >> } >> } >> >> >> Op do 3 mrt. 2022 om 15:12 schreef HG : >> >>> Hi, >>> I have need to sort the input of the ProcesWindowFunction by one of the >>> fields of the Tuple4 that is in the Iterator. >>> >>> static class MyProcessWindowFunction extends >>> ProcessWindowFunction, String, String, >>> TimeWindow> { >>> @Override >>> public void process(String key, Context context, >>> Iterable> input, Collector out) >>> { >>> Long elapsed = 0L; >>> Long pHandlingTime = 0L; >>> Long totalElapsed = 0L >>> >>> System.out.println(input.getClass()); >>> >>> Iterator> etter = >>> input.iterator(); >>> *for (Tuple4 in: input){* >>> transactionId = in.getField(2).toString(); >>> elapsed = >>> Long.parseLong(in.getField(1).toString()) - pHandlingTime; >>> totalElapsed = totalElapsed + elapsed; >>> pHandlingTime = Long.parseLong(in.getField(1).toString()) >>> >>> out.collect("Key : " + key + " Window : " + >>> context.window() + " transactionId : " + transactionId + " elapsed : " + >>> elapsed.toString() + " max handling time : " + h.toString() + " >>> totalElapsed " + totalElapsed); >>> } >>> } >>> } >>> >>
Re: How to sort Iterable in ProcessWindowFunction?
Collect the elements to a list, then sort, then collect out. HG 于2022年3月3日周四 22:13写道: > Hi, > I have need to sort the input of the ProcesWindowFunction by one of the > fields of the Tuple4 that is in the Iterator. > > Any advice as to what the best way is? > > static class MyProcessWindowFunction extends > ProcessWindowFunction, String, String, > TimeWindow> { > @Override > public void process(String key, Context context, > Iterable> input, Collector out) > { > Long elapsed = 0L; > Long pHandlingTime = 0L; > Long totalElapsed = 0L > > System.out.println(input.getClass()); > > Iterator> etter = > input.iterator(); > *for (Tuple4 in: input){* > transactionId = in.getField(2).toString(); > elapsed = Long.parseLong(in.getField(1).toString()) > - pHandlingTime; > totalElapsed = totalElapsed + elapsed; > pHandlingTime = Long.parseLong(in.getField(1).toString()) > > out.collect("Key : " + key + " Window : " + > context.window() + " transactionId : " + transactionId + " elapsed : " + > elapsed.toString() + " max handling time : " + h.toString() + " > totalElapsed " + totalElapsed); > } > } > } > > > Op do 3 mrt. 2022 om 15:12 schreef HG : > >> Hi, >> I have need to sort the input of the ProcesWindowFunction by one of the >> fields of the Tuple4 that is in the Iterator. >> >> static class MyProcessWindowFunction extends >> ProcessWindowFunction, String, String, >> TimeWindow> { >> @Override >> public void process(String key, Context context, >> Iterable> input, Collector out) >> { >> Long elapsed = 0L; >> Long pHandlingTime = 0L; >> Long totalElapsed = 0L >> >> System.out.println(input.getClass()); >> >> Iterator> etter = >> input.iterator(); >> *for (Tuple4 in: input){* >> transactionId = in.getField(2).toString(); >> elapsed = Long.parseLong(in.getField(1).toString()) >> - pHandlingTime; >> totalElapsed = totalElapsed + elapsed; >> pHandlingTime = Long.parseLong(in.getField(1).toString()) >> >> out.collect("Key : " + key + " Window : " + >> context.window() + " transactionId : " + transactionId + " elapsed : " + >> elapsed.toString() + " max handling time : " + h.toString() + " >> totalElapsed " + totalElapsed); >> } >> } >> } >> >
Re: How to sort Iterable in ProcessWindowFunction?
Hi, I have need to sort the input of the ProcesWindowFunction by one of the fields of the Tuple4 that is in the Iterator. Any advice as to what the best way is? static class MyProcessWindowFunction extends ProcessWindowFunction, String, String, TimeWindow> { @Override public void process(String key, Context context, Iterable> input, Collector out) { Long elapsed = 0L; Long pHandlingTime = 0L; Long totalElapsed = 0L System.out.println(input.getClass()); Iterator> etter = input.iterator(); *for (Tuple4 in: input){* transactionId = in.getField(2).toString(); elapsed = Long.parseLong(in.getField(1).toString()) - pHandlingTime; totalElapsed = totalElapsed + elapsed; pHandlingTime = Long.parseLong(in.getField(1).toString()) out.collect("Key : " + key + " Window : " + context.window() + " transactionId : " + transactionId + " elapsed : " + elapsed.toString() + " max handling time : " + h.toString() + " totalElapsed " + totalElapsed); } } } Op do 3 mrt. 2022 om 15:12 schreef HG : > Hi, > I have need to sort the input of the ProcesWindowFunction by one of the > fields of the Tuple4 that is in the Iterator. > > static class MyProcessWindowFunction extends > ProcessWindowFunction, String, String, > TimeWindow> { > @Override > public void process(String key, Context context, > Iterable> input, Collector out) > { > Long elapsed = 0L; > Long pHandlingTime = 0L; > Long totalElapsed = 0L > > System.out.println(input.getClass()); > > Iterator> etter = > input.iterator(); > *for (Tuple4 in: input){* > transactionId = in.getField(2).toString(); > elapsed = Long.parseLong(in.getField(1).toString()) > - pHandlingTime; > totalElapsed = totalElapsed + elapsed; > pHandlingTime = Long.parseLong(in.getField(1).toString()) > > out.collect("Key : " + key + " Window : " + > context.window() + " transactionId : " + transactionId + " elapsed : " + > elapsed.toString() + " max handling time : " + h.toString() + " > totalElapsed " + totalElapsed); > } > } > } >
How to sort Iterable in ProcessWindowFunction?
Hi, I have need to sort the input of the ProcesWindowFunction by one of the fields of the Tuple4 that is in the Iterator. static class MyProcessWindowFunction extends ProcessWindowFunction, String, String, TimeWindow> { @Override public void process(String key, Context context, Iterable> input, Collector out) { Long elapsed = 0L; Long pHandlingTime = 0L; Long totalElapsed = 0L System.out.println(input.getClass()); Iterator> etter = input.iterator(); *for (Tuple4 in: input){* transactionId = in.getField(2).toString(); elapsed = Long.parseLong(in.getField(1).toString()) - pHandlingTime; totalElapsed = totalElapsed + elapsed; pHandlingTime = Long.parseLong(in.getField(1).toString()) out.collect("Key : " + key + " Window : " + context.window() + " transactionId : " + transactionId + " elapsed : " + elapsed.toString() + " max handling time : " + h.toString() + " totalElapsed " + totalElapsed); } } }
Re: processwindowfunction output Iterator
Ok I had the impression, obviously incorrect that it could be called only once. Thanks On Tue, Mar 1, 2022, 09:26 Schwalbe Matthias wrote: > Goedemorgen Hans, > > > > You can call the out.collect(…) multiple times, i.e. for each forwarded > event … how about this > > > > Thias > > > > > > *From:* HG > *Sent:* Montag, 28. Februar 2022 16:25 > *To:* user > *Subject:* processwindowfunction output Iterator > > > > Hi, > > > > > > Can processwindowfunction output an Iterator? > > I need to sort and subtract timestamps from keyed events and then output > them all with added elapsed times. > > > > Regards Hans > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. >
RE: processwindowfunction output Iterator
Goedemorgen Hans, You can call the out.collect(…) multiple times, i.e. for each forwarded event … how about this Thias From: HG Sent: Montag, 28. Februar 2022 16:25 To: user Subject: processwindowfunction output Iterator Hi, Can processwindowfunction output an Iterator? I need to sort and subtract timestamps from keyed events and then output them all with added elapsed times. Regards Hans Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.
processwindowfunction output Iterator
Hi, Can processwindowfunction output an Iterator? I need to sort and subtract timestamps from keyed events and then output them all with added elapsed times. Regards Hans
Re: clear() in a ProcessWindowFunction
Thank you for the confirmation. The simulations confirm too. On Fri, Apr 9, 2021 at 11:14 AM Roman Khachatryan wrote: > Hi Vishal, > > Sorry for the late reply, > Please find my answers below. > By state I assume the state obtained via getRuntimeContext (access to > window state is not allowed).. > > > The state is scoped to the key (created per key in the > ProcessWindowFunction with a ttl ) > Yes. > > > The state will remain alive irrespective of whether the Window is closed > or not (a TTL timer does the collection ) > Right, but you need to configure TTL when accessing the state [1] > > > The execution on a key is sequential , as in if 2 events arrive for the > 2 Sessions they happen sequentially ( or in any order but without the need > of synchronization ) > Right. > > > The state mutated by an event in Session A, will be visible to Session B > if an event incident on Session B was to happen subsequently. There is no > need of synchronizing access to the state as it for the same key. > Right. > > Your understanding of merging of window contents is also correct. > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl > > Regards, > Roman > > > On Wed, Mar 31, 2021 at 5:45 PM Vishal Santoshi > wrote: > > > > I had a query Say I have a single key with 2 live sessions ( A and B ) > with a configured lateness . > > > > Do these invariants hold? > > > > * The state is scoped to the key (created per key in the > ProcessWindowFunction with a ttl ) > > * The state will remain alive irrespective of whether the Window is > closed or not (a TTL timer does the collection ) > > * The execution on a key is sequential , as in if 2 events arrive for > the 2 Sessions they happen sequentially ( or in any order but without the > need of synchronization ) > > * The state mutated by an event in Session A, will be visible to Session > B if an event incident on Session B was to happen subsequently. There is > no need of synchronizing access to the state as it for the same key. > > > > What I am not sure about is what happens when session A merge with > session B. I would assume that it just is defining new start and end of the > merged window, Gcing the old ones ( or at least one of them ) and assigning > that even to that new window. What one does with the custom state in > ProcessWindowFunction ( there is a CountTrigger of 1 ) , really what is > done in the process method above, As in this state is 1 degree removed > from what ever flink does internally with it's merges given that the state > is scoped to the key. > > > > > > > > > > > > > > > > On Fri, Mar 12, 2021 at 12:37 PM Vishal Santoshi < > vishal.santo...@gmail.com> wrote: > >> > >> Yep, makes sense. > >> > >> On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan > wrote: > >>> > >>> > Want to confirm that the keys are GCed ( along with state ) once > the (windows close + lateness ) ? > >>> Window state is cleared (as well as the window itself), but global > >>> state is not (unless you use TTL). > >>> > >>> [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl > >>> > >>> Regards, > >>> Roman > >>> > >>> On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi > >>> wrote: > >>> > > >>> > Sometimes writing it down makes you think. I now realize that this > is not the right approach, given that merging windows will have their own > states..and how the merge happens is really at the key level > >>> > > >>> > > >>> > > >>> > On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi < > vishal.santo...@gmail.com> wrote: > >>> >> > >>> >> I intend to augment every event in a session with a unique ID. To > keep the session lean, there is a PurgingTrigger on this aggregate that > fires on a count of 1. > >>> >> > >>> >> >> (except that the number of keys can grow). > >>> >> > >>> >> Want to confirm that the keys are GCed ( along with state ) once > the (windows close + lateness ) ? > >>> >> > >>> >> > >>> >> > >>> >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan > wrote: > >>> >>> > >>> >>> Hi Vishal, > >>> >>&g
Re: clear() in a ProcessWindowFunction
Hi Vishal, Sorry for the late reply, Please find my answers below. By state I assume the state obtained via getRuntimeContext (access to window state is not allowed).. > The state is scoped to the key (created per key in the ProcessWindowFunction > with a ttl ) Yes. > The state will remain alive irrespective of whether the Window is closed or > not (a TTL timer does the collection ) Right, but you need to configure TTL when accessing the state [1] > The execution on a key is sequential , as in if 2 events arrive for the 2 > Sessions they happen sequentially ( or in any order but without the need of > synchronization ) Right. > The state mutated by an event in Session A, will be visible to Session B if > an event incident on Session B was to happen subsequently. There is no need > of synchronizing access to the state as it for the same key. Right. Your understanding of merging of window contents is also correct. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl Regards, Roman On Wed, Mar 31, 2021 at 5:45 PM Vishal Santoshi wrote: > > I had a query Say I have a single key with 2 live sessions ( A and B ) with > a configured lateness . > > Do these invariants hold? > > * The state is scoped to the key (created per key in the > ProcessWindowFunction with a ttl ) > * The state will remain alive irrespective of whether the Window is closed or > not (a TTL timer does the collection ) > * The execution on a key is sequential , as in if 2 events arrive for the 2 > Sessions they happen sequentially ( or in any order but without the need of > synchronization ) > * The state mutated by an event in Session A, will be visible to Session B if > an event incident on Session B was to happen subsequently. There is no need > of synchronizing access to the state as it for the same key. > > What I am not sure about is what happens when session A merge with session B. > I would assume that it just is defining new start and end of the merged > window, Gcing the old ones ( or at least one of them ) and assigning that > even to that new window. What one does with the custom state in > ProcessWindowFunction ( there is a CountTrigger of 1 ) , really what is done > in the process method above, As in this state is 1 degree removed from what > ever flink does internally with it's merges given that the state is scoped to > the key. > > > > > > > > On Fri, Mar 12, 2021 at 12:37 PM Vishal Santoshi > wrote: >> >> Yep, makes sense. >> >> On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan wrote: >>> >>> > Want to confirm that the keys are GCed ( along with state ) once the >>> > (windows close + lateness ) ? >>> Window state is cleared (as well as the window itself), but global >>> state is not (unless you use TTL). >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl >>> >>> Regards, >>> Roman >>> >>> On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi >>> wrote: >>> > >>> > Sometimes writing it down makes you think. I now realize that this is not >>> > the right approach, given that merging windows will have their own >>> > states..and how the merge happens is really at the key level >>> > >>> > >>> > >>> > On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi >>> > wrote: >>> >> >>> >> I intend to augment every event in a session with a unique ID. To keep >>> >> the session lean, there is a PurgingTrigger on this aggregate that >>> >> fires on a count of 1. >>> >> >>> >> >> (except that the number of keys can grow). >>> >> >>> >> Want to confirm that the keys are GCed ( along with state ) once the >>> >> (windows close + lateness ) ? >>> >> >>> >> >>> >> >>> >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan >>> >> wrote: >>> >>> >>> >>> Hi Vishal, >>> >>> >>> >>> There is no leak in the code you provided (except that the number of >>> >>> keys can grow). >>> >>> But as you figured out the state is scoped to key, not to window+key. >>> >>> >>> >>> Could you explain what you are trying to achieve and why do you need to >>> >>> combine >>> >>> sliding windows with state scoped t
Re: clear() in a ProcessWindowFunction
I had a query Say I have a single key with 2 live sessions ( A and B ) with a configured lateness . Do these invariants hold? * The state is scoped to the key (created per key in the ProcessWindowFunction with a ttl ) * The state will remain alive irrespective of whether the Window is closed or not (a TTL timer does the collection ) * The execution on a key is sequential , as in if 2 events arrive for the 2 Sessions they happen sequentially ( or in any order but without the need of synchronization ) * The state mutated by an event in Session A, will be visible to Session B if an event incident on Session B was to happen subsequently. There is no need of synchronizing access to the state as it for the same key. What I am not sure about is what happens when session A merge with session B. I would assume that it just is defining new start and end of the merged window, Gcing the old ones ( or at least one of them ) and assigning that even to that new window. What one does with the custom state in ProcessWindowFunction ( there is a CountTrigger of 1 ) , really what is done in the process method above, As in this state is 1 degree removed from what ever flink does internally with it's merges given that the state is scoped to the key. On Fri, Mar 12, 2021 at 12:37 PM Vishal Santoshi wrote: > Yep, makes sense. > > On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan > wrote: > >> > Want to confirm that the keys are GCed ( along with state ) once the >> (windows close + lateness ) ? >> Window state is cleared (as well as the window itself), but global >> state is not (unless you use TTL). >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl >> >> Regards, >> Roman >> >> On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi >> wrote: >> > >> > Sometimes writing it down makes you think. I now realize that this is >> not the right approach, given that merging windows will have their own >> states..and how the merge happens is really at the key level >> > >> > >> > >> > On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >> >> >> I intend to augment every event in a session with a unique ID. To >> keep the session lean, there is a PurgingTrigger on this aggregate that >> fires on a count of 1. >> >> >> >> >> (except that the number of keys can grow). >> >> >> >> Want to confirm that the keys are GCed ( along with state ) once the >> (windows close + lateness ) ? >> >> >> >> >> >> >> >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan >> wrote: >> >>> >> >>> Hi Vishal, >> >>> >> >>> There is no leak in the code you provided (except that the number of >> >>> keys can grow). >> >>> But as you figured out the state is scoped to key, not to window+key. >> >>> >> >>> Could you explain what you are trying to achieve and why do you need >> to combine >> >>> sliding windows with state scoped to window+key? >> >>> >> >>> Regards, >> >>> Roman >> >>> >> >>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi >> >>> wrote: >> >>> > >> >>> > Essentially, Does this code leak state >> >>> > >> >>> > private static class SessionIdProcessWindowFunction> java.io.Serializable, VALUE extends java.io.Serializable> >> >>> > extends >> >>> > ProcessWindowFunction, >> KeyedSessionWithSessionID, KEY, TimeWindow> { >> >>> > private static final long serialVersionUID = 1L; >> >>> > private final static ValueStateDescriptor sessionId = new >> ValueStateDescriptor("session_uid", >> >>> > String.class); >> >>> > >> >>> > @Override >> >>> > public void process(KEY key, >> >>> > ProcessWindowFunction, >> KeyedSessionWithSessionID, KEY, TimeWindow>.Context context, >> >>> > Iterable> elements, >> Collector> out) >> >>> > throws Exception { >> >>> > // I need this scoped to key/window >> >>> > if (getRuntimeContext().getState(sessionId).value() == null) { >> >>> > UUID uuid = UUID.randomUUID(); >> >>> > getRuntimeContext().getState(sessionId).update(uuid.toString()); >> >>> > } >&g
Re: clear() in a ProcessWindowFunction
Yep, makes sense. On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan wrote: > > Want to confirm that the keys are GCed ( along with state ) once the > (windows close + lateness ) ? > Window state is cleared (as well as the window itself), but global > state is not (unless you use TTL). > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl > > Regards, > Roman > > On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi > wrote: > > > > Sometimes writing it down makes you think. I now realize that this is > not the right approach, given that merging windows will have their own > states..and how the merge happens is really at the key level > > > > > > > > On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi < > vishal.santo...@gmail.com> wrote: > >> > >> I intend to augment every event in a session with a unique ID. To > keep the session lean, there is a PurgingTrigger on this aggregate that > fires on a count of 1. > >> > >> >> (except that the number of keys can grow). > >> > >> Want to confirm that the keys are GCed ( along with state ) once the > (windows close + lateness ) ? > >> > >> > >> > >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan > wrote: > >>> > >>> Hi Vishal, > >>> > >>> There is no leak in the code you provided (except that the number of > >>> keys can grow). > >>> But as you figured out the state is scoped to key, not to window+key. > >>> > >>> Could you explain what you are trying to achieve and why do you need > to combine > >>> sliding windows with state scoped to window+key? > >>> > >>> Regards, > >>> Roman > >>> > >>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi > >>> wrote: > >>> > > >>> > Essentially, Does this code leak state > >>> > > >>> > private static class SessionIdProcessWindowFunction java.io.Serializable, VALUE extends java.io.Serializable> > >>> > extends > >>> > ProcessWindowFunction, > KeyedSessionWithSessionID, KEY, TimeWindow> { > >>> > private static final long serialVersionUID = 1L; > >>> > private final static ValueStateDescriptor sessionId = new > ValueStateDescriptor("session_uid", > >>> > String.class); > >>> > > >>> > @Override > >>> > public void process(KEY key, > >>> > ProcessWindowFunction, > KeyedSessionWithSessionID, KEY, TimeWindow>.Context context, > >>> > Iterable> elements, > Collector> out) > >>> > throws Exception { > >>> > // I need this scoped to key/window > >>> > if (getRuntimeContext().getState(sessionId).value() == null) { > >>> > UUID uuid = UUID.randomUUID(); > >>> > getRuntimeContext().getState(sessionId).update(uuid.toString()); > >>> > } > >>> > String uuid = getRuntimeContext().getState(sessionId).value(); > >>> > out.collect(new > KeyedSessionWithSessionID<>(elements.iterator().next(), uuid)); > >>> > } > >>> > } > >>> > > >>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi < > vishal.santo...@gmail.com> wrote: > >>> >> > >>> >> Hello folks, > >>> >> The suggestion is to use windowState() for a key > key per window state and clear the state explicitly. Also it seems that > getRuntime().getState() will return a globalWindow() where state is shared > among windows with the same key. I desire of course to have state scoped to > a key per window and was wanting to use windowState().. The caveat is that > my window is a Session Window and when I try to use clear() I am thrown > this exception ( Session Windows are Merging Windows ) > >>> >> > >>> >> Caused by: java.lang.UnsupportedOperationException: Per-window > state is not allowed when using merging windows. > >>> >> > >>> >> > >>> >> The questions are > >>> >> > >>> >> * How do I have state per session window/ per key and still be able > to clear it ? > >>> >> * Does getRuntime().getState() give me the clear() semantics for > free along with state per window per key and thus I have understood > getRuntime().getState() wrong ? > >>> >> > >>> >> Regards. > >>> >> > >>> >> > >>> >> >
Re: clear() in a ProcessWindowFunction
> Want to confirm that the keys are GCed ( along with state ) once the > (windows close + lateness ) ? Window state is cleared (as well as the window itself), but global state is not (unless you use TTL). [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl Regards, Roman On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi wrote: > > Sometimes writing it down makes you think. I now realize that this is not the > right approach, given that merging windows will have their own states..and > how the merge happens is really at the key level > > > > On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi > wrote: >> >> I intend to augment every event in a session with a unique ID. To keep the >> session lean, there is a PurgingTrigger on this aggregate that fires on a >> count of 1. >> >> >> (except that the number of keys can grow). >> >> Want to confirm that the keys are GCed ( along with state ) once the >> (windows close + lateness ) ? >> >> >> >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan wrote: >>> >>> Hi Vishal, >>> >>> There is no leak in the code you provided (except that the number of >>> keys can grow). >>> But as you figured out the state is scoped to key, not to window+key. >>> >>> Could you explain what you are trying to achieve and why do you need to >>> combine >>> sliding windows with state scoped to window+key? >>> >>> Regards, >>> Roman >>> >>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi >>> wrote: >>> > >>> > Essentially, Does this code leak state >>> > >>> > private static class SessionIdProcessWindowFunction>> > java.io.Serializable, VALUE extends java.io.Serializable> >>> > extends >>> > ProcessWindowFunction, >>> > KeyedSessionWithSessionID, KEY, TimeWindow> { >>> > private static final long serialVersionUID = 1L; >>> > private final static ValueStateDescriptor sessionId = new >>> > ValueStateDescriptor("session_uid", >>> > String.class); >>> > >>> > @Override >>> > public void process(KEY key, >>> > ProcessWindowFunction, >>> > KeyedSessionWithSessionID, KEY, TimeWindow>.Context context, >>> > Iterable> elements, >>> > Collector> out) >>> > throws Exception { >>> > // I need this scoped to key/window >>> > if (getRuntimeContext().getState(sessionId).value() == null) { >>> > UUID uuid = UUID.randomUUID(); >>> > getRuntimeContext().getState(sessionId).update(uuid.toString()); >>> > } >>> > String uuid = getRuntimeContext().getState(sessionId).value(); >>> > out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(), >>> > uuid)); >>> > } >>> > } >>> > >>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi >>> > wrote: >>> >> >>> >> Hello folks, >>> >> The suggestion is to use windowState() for a key key >>> >> per window state and clear the state explicitly. Also it seems that >>> >> getRuntime().getState() will return a globalWindow() where state is >>> >> shared among windows with the same key. I desire of course to have state >>> >> scoped to a key per window and was wanting to use windowState().. The >>> >> caveat is that my window is a Session Window and when I try to use >>> >> clear() I am thrown this exception ( Session Windows are Merging >>> >> Windows ) >>> >> >>> >> Caused by: java.lang.UnsupportedOperationException: Per-window state is >>> >> not allowed when using merging windows. >>> >> >>> >> >>> >> The questions are >>> >> >>> >> * How do I have state per session window/ per key and still be able to >>> >> clear it ? >>> >> * Does getRuntime().getState() give me the clear() semantics for free >>> >> along with state per window per key and thus I have understood >>> >> getRuntime().getState() wrong ? >>> >> >>> >> Regards. >>> >> >>> >> >>> >>
Re: clear() in a ProcessWindowFunction
Sometimes writing it down makes you think. I now realize that this is not the right approach, given that merging windows will have their own states..and how the merge happens is really at the key level On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi wrote: > I intend to augment every event in a session with a unique ID. To keep > the session lean, there is a PurgingTrigger on this aggregate that fires > on a count of 1. > > >> (except that the number of keys can grow). > > Want to confirm that the keys are GCed ( along with state ) once the > (windows close + lateness ) ? > > > > On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan > wrote: > >> Hi Vishal, >> >> There is no leak in the code you provided (except that the number of >> keys can grow). >> But as you figured out the state is scoped to key, not to window+key. >> >> Could you explain what you are trying to achieve and why do you need to >> combine >> sliding windows with state scoped to window+key? >> >> Regards, >> Roman >> >> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi >> wrote: >> > >> > Essentially, Does this code leak state >> > >> > private static class SessionIdProcessWindowFunction> java.io.Serializable, VALUE extends java.io.Serializable> >> > extends >> > ProcessWindowFunction, >> KeyedSessionWithSessionID, KEY, TimeWindow> { >> > private static final long serialVersionUID = 1L; >> > private final static ValueStateDescriptor sessionId = new >> ValueStateDescriptor("session_uid", >> > String.class); >> > >> > @Override >> > public void process(KEY key, >> > ProcessWindowFunction, >> KeyedSessionWithSessionID, KEY, TimeWindow>.Context context, >> > Iterable> elements, >> Collector> out) >> > throws Exception { >> > // I need this scoped to key/window >> > if (getRuntimeContext().getState(sessionId).value() == null) { >> > UUID uuid = UUID.randomUUID(); >> > getRuntimeContext().getState(sessionId).update(uuid.toString()); >> > } >> > String uuid = getRuntimeContext().getState(sessionId).value(); >> > out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(), >> uuid)); >> > } >> > } >> > >> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >> >> >> Hello folks, >> >> The suggestion is to use windowState() for a key key >> per window state and clear the state explicitly. Also it seems that >> getRuntime().getState() will return a globalWindow() where state is shared >> among windows with the same key. I desire of course to have state scoped to >> a key per window and was wanting to use windowState().. The caveat is that >> my window is a Session Window and when I try to use clear() I am thrown >> this exception ( Session Windows are Merging Windows ) >> >> >> >> Caused by: java.lang.UnsupportedOperationException: Per-window state >> is not allowed when using merging windows. >> >> >> >> >> >> The questions are >> >> >> >> * How do I have state per session window/ per key and still be able to >> clear it ? >> >> * Does getRuntime().getState() give me the clear() semantics for free >> along with state per window per key and thus I have understood >> getRuntime().getState() wrong ? >> >> >> >> Regards. >> >> >> >> >> >> >> >
Re: clear() in a ProcessWindowFunction
I intend to augment every event in a session with a unique ID. To keep the session lean, there is a PurgingTrigger on this aggregate that fires on a count of 1. >> (except that the number of keys can grow). Want to confirm that the keys are GCed ( along with state ) once the (windows close + lateness ) ? On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan wrote: > Hi Vishal, > > There is no leak in the code you provided (except that the number of > keys can grow). > But as you figured out the state is scoped to key, not to window+key. > > Could you explain what you are trying to achieve and why do you need to > combine > sliding windows with state scoped to window+key? > > Regards, > Roman > > On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi > wrote: > > > > Essentially, Does this code leak state > > > > private static class SessionIdProcessWindowFunction java.io.Serializable, VALUE extends java.io.Serializable> > > extends > > ProcessWindowFunction, > KeyedSessionWithSessionID, KEY, TimeWindow> { > > private static final long serialVersionUID = 1L; > > private final static ValueStateDescriptor sessionId = new > ValueStateDescriptor("session_uid", > > String.class); > > > > @Override > > public void process(KEY key, > > ProcessWindowFunction, > KeyedSessionWithSessionID, KEY, TimeWindow>.Context context, > > Iterable> elements, > Collector> out) > > throws Exception { > > // I need this scoped to key/window > > if (getRuntimeContext().getState(sessionId).value() == null) { > > UUID uuid = UUID.randomUUID(); > > getRuntimeContext().getState(sessionId).update(uuid.toString()); > > } > > String uuid = getRuntimeContext().getState(sessionId).value(); > > out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(), > uuid)); > > } > > } > > > > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi < > vishal.santo...@gmail.com> wrote: > >> > >> Hello folks, > >> The suggestion is to use windowState() for a key key > per window state and clear the state explicitly. Also it seems that > getRuntime().getState() will return a globalWindow() where state is shared > among windows with the same key. I desire of course to have state scoped to > a key per window and was wanting to use windowState().. The caveat is that > my window is a Session Window and when I try to use clear() I am thrown > this exception ( Session Windows are Merging Windows ) > >> > >> Caused by: java.lang.UnsupportedOperationException: Per-window state is > not allowed when using merging windows. > >> > >> > >> The questions are > >> > >> * How do I have state per session window/ per key and still be able to > clear it ? > >> * Does getRuntime().getState() give me the clear() semantics for free > along with state per window per key and thus I have understood > getRuntime().getState() wrong ? > >> > >> Regards. > >> > >> > >> >
Re: clear() in a ProcessWindowFunction
Hi Vishal, There is no leak in the code you provided (except that the number of keys can grow). But as you figured out the state is scoped to key, not to window+key. Could you explain what you are trying to achieve and why do you need to combine sliding windows with state scoped to window+key? Regards, Roman On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi wrote: > > Essentially, Does this code leak state > > private static class SessionIdProcessWindowFunction java.io.Serializable, VALUE extends java.io.Serializable> > extends > ProcessWindowFunction, > KeyedSessionWithSessionID, KEY, TimeWindow> { > private static final long serialVersionUID = 1L; > private final static ValueStateDescriptor sessionId = new > ValueStateDescriptor("session_uid", > String.class); > > @Override > public void process(KEY key, > ProcessWindowFunction, > KeyedSessionWithSessionID, KEY, TimeWindow>.Context context, > Iterable> elements, > Collector> out) > throws Exception { > // I need this scoped to key/window > if (getRuntimeContext().getState(sessionId).value() == null) { > UUID uuid = UUID.randomUUID(); > getRuntimeContext().getState(sessionId).update(uuid.toString()); > } > String uuid = getRuntimeContext().getState(sessionId).value(); > out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(), > uuid)); > } > } > > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi > wrote: >> >> Hello folks, >> The suggestion is to use windowState() for a key key per >> window state and clear the state explicitly. Also it seems that >> getRuntime().getState() will return a globalWindow() where state is shared >> among windows with the same key. I desire of course to have state scoped to >> a key per window and was wanting to use windowState().. The caveat is that >> my window is a Session Window and when I try to use clear() I am thrown >> this exception ( Session Windows are Merging Windows ) >> >> Caused by: java.lang.UnsupportedOperationException: Per-window state is not >> allowed when using merging windows. >> >> >> The questions are >> >> * How do I have state per session window/ per key and still be able to clear >> it ? >> * Does getRuntime().getState() give me the clear() semantics for free along >> with state per window per key and thus I have understood >> getRuntime().getState() wrong ? >> >> Regards. >> >> >>
Re: clear() in a ProcessWindowFunction
Essentially, Does this code leak state private static class SessionIdProcessWindowFunction extends ProcessWindowFunction, KeyedSessionWithSessionID< KEY, VALUE>, KEY, TimeWindow> { private static final long serialVersionUID = 1L; private final static ValueStateDescriptor sessionId = new ValueStateDescriptor("session_uid", String.class); @Override public void process(KEY key, ProcessWindowFunction, KeyedSessionWithSessionID< KEY, VALUE>, KEY, TimeWindow>.Context context, Iterable> elements, Collector< KeyedSessionWithSessionID> out) throws Exception { // *I need this scoped to key/window* if (getRuntimeContext().getState(sessionId).value() == null) { UUID uuid = UUID.randomUUID(); getRuntimeContext().getState(sessionId).update(uuid.toString()); } String uuid = getRuntimeContext().getState(sessionId).value(); out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(), uuid )); } } On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi wrote: > Hello folks, > The suggestion is to use windowState() for a key key per > window state and clear the state explicitly. Also it seems that > getRuntime().getState() will return a globalWindow() where state is shared > among windows with the same key. I desire of course to have state scoped to > a key per window and was wanting to use windowState().. The caveat is that > my window is a Session Window and when I try to use clear() I am > thrown this exception ( Session Windows are Merging Windows ) > > Caused by: java.lang.UnsupportedOperationException: Per-window state is > not allowed when using merging windows. > > > The questions are > > * How do I have state per *session* window/ per key and still be able to > clear it ? > * Does getRuntime().getState() give me the clear() semantics for free > along with state per window per key and thus I have > understood getRuntime().getState() wrong ? > > Regards. > > > >
clear() in a ProcessWindowFunction
Hello folks, The suggestion is to use windowState() for a key key per window state and clear the state explicitly. Also it seems that getRuntime().getState() will return a globalWindow() where state is shared among windows with the same key. I desire of course to have state scoped to a key per window and was wanting to use windowState().. The caveat is that my window is a Session Window and when I try to use clear() I am thrown this exception ( Session Windows are Merging Windows ) Caused by: java.lang.UnsupportedOperationException: Per-window state is not allowed when using merging windows. The questions are * How do I have state per *session* window/ per key and still be able to clear it ? * Does getRuntime().getState() give me the clear() semantics for free along with state per window per key and thus I have understood getRuntime().getState() wrong ? Regards.
Re: How to use ProcessWindowFunction in pyflink?
Hi Hongyuan, it seems as if PyFlink's datastream API is still lacking window support [1], which is targeted for next release. Examples for windows in PyFlink's table API are available here [2]. from pyflink.table.window import Tumblefrom pyflink.table.expressions import lit, col orders = t_env.from_path("Orders")result = orders.window(Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")) \ .group_by(orders.a, col('w')) \ .select(orders.a, col('w').start, col('w').end, orders.b.sum.alias('d')) [1] https://issues.apache.org/jira/browse/FLINK-21202 [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/operations.html#aggregations On Fri, Feb 19, 2021 at 8:26 AM Hongyuan Ma wrote: > Greetings, > > I am a newbie to pyflink. I want to be able to use processWindowFunction > in a Tumble Window, and finally output 0 or more lines. I have checked the > datastreamAPI and TableAPI of pyflink, but have not found a complete > example. pyflink's datastream API does not seem to implement window() yet. > And I'm not sure how to use TableAPI. > > If I use java to implement "public class MyProcessWindowFunctextends > ProcessWindowFunction” and registered as udf in python, > is it possible to call it through select statement in pyflink? Can the > select statement correctly return zero or more rows of results? > > Any help will be appreciated! > > - > Best Regards, > Hongyuan Ma >
如何在pyflink中使用全量窗口聚合ProcessWindowFunction
向您问好, 我是一名pyflink的新手。我希望能够在Tumble Window中使用processWindowFunction, 对窗口内数据进行全量计算并最终输出0行或者多行。我查阅了pyflink的datastreamAPI和TableAPI,都没有找到完整的示例。pyflink 的datastreamAPI目前似乎还没有实现window()。而我对TableAPI的使用方法还不太明确。 假如我使用java实现了“public class MyProcessWindowFunctextends ProcessWindowFunction {}”, 打成jar包在pyflink中注册为udf, 有可能在TableAPI中通过select语句调用它吗, select语句可以正确地返回0行或者多行结果吗?如果能提供一个pyflink简单的 processWindowFunction的示例, 我将不胜感激! 提前感谢您的帮助! 马宏元
How to use ProcessWindowFunction in pyflink?
Greetings, I am a newbie to pyflink. I want to be able to use processWindowFunction in a Tumble Window, and finally output 0 or more lines. I have checked the datastreamAPI and TableAPI of pyflink, but have not found a complete example. pyflink's datastream API does not seem to implement window() yet. And I'm not sure how to use TableAPI. If I use java to implement "public class MyProcessWindowFunctextends ProcessWindowFunction” and registered as udf in python, is it possible to call it through select statement in pyflink? Can the select statement correctly return zero or more rows of results? Any help will be appreciated! - Best Regards, Hongyuan Ma
Re: What is the difference between RuntimeContext state and ProcessWindowFunction.Context state when using a ProcessWindowFunction?
Hi Marco, 1. RuntimeContext is available to all operators and is bound to the current key (if used in a keyed context). 2. Is the same actually. Not sure why it's exposed twice... So use either one. 3. Is additionally bound to the current window. On Tue, Feb 9, 2021 at 10:46 PM Marco Villalobos wrote: > Hi, > > I am having a difficult time distinguishing the difference between > RuntimeContext state and global state when using a ProcessWindowFunction. > > A ProcessWindowFunction has three access different kinds of state. > 1. RuntimeContext state. > 2. ProcessWindowFunction.Context global state > 3. ProcessWindowFunction.Context window state. > > It's clear to me that the window state belongs to a window, the lines > seemed a bit blurred between RuntimeContext state and > ProcessWindowFunction.Context global state. > > Can somebody please elaborate on the differences, and when I should use > global state vs runtime context state? > > Thank you. >
What is the difference between RuntimeContext state and ProcessWindowFunction.Context state when using a ProcessWindowFunction?
Hi, I am having a difficult time distinguishing the difference between RuntimeContext state and global state when using a ProcessWindowFunction. A ProcessWindowFunction has three access different kinds of state. 1. RuntimeContext state. 2. ProcessWindowFunction.Context global state 3. ProcessWindowFunction.Context window state. It's clear to me that the window state belongs to a window, the lines seemed a bit blurred between RuntimeContext state and ProcessWindowFunction.Context global state. Can somebody please elaborate on the differences, and when I should use global state vs runtime context state? Thank you.
?????? ProcessWindowFunction??????clear??????????????????-v1.10.1
??8?? .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) ---- ??: "x" <35907...@qq.com; :2020??8??27??(??) 2:00 ??:"user-zh@flink.apache.org"
?????? ProcessWindowFunction??????clear??????????????????-v1.10.1
10??1??.window(TumblingEventTimeWindows.of(Time.minutes(10))).trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))10 ---- ??: "user-zh"
Re: ProcessWindowFunction为何在clear方法中无法清理状态-v1.10.1
按我的理解,参考aggregate(AggregateFunction aggFunction, ProcessWindowFunction windowFunction)方法, 窗口中的状态数据是存在某个聚合函数里的,processWindowFunction只是处理窗口的结果,需要通过context获取对应的窗口state来做清理。 x <35907...@qq.com> 于2020年8月25日周二 下午6:25写道: > > 按天开窗,按分钟触发,重写了ProcessWindowFunction的clear方法,希望在窗口结束的时候,将状态清理,可实际执行中,跨天状态并未清理,代买如下env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)… > .window(TumblingEventTimeWindows.of(Time.days(1))) > .trigger(ContinuousEventTimeTrigger.of(Time.minutes(1))) > .evictor(TimeEvictor.of(Time.seconds(0), true)) > .process(new ProcessWindowFunction[IN,OUT,KEY,TimeWindow]{ > private var state: MapState[String,Boolean] = _ > override def open > override def process > override def clear(ctx: Context): Unit = { > state.clear() > } > }
ProcessWindowFunction??????clear??????????????????-v1.10.1
ProcessWindowFunction??clearenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)?? .window(TumblingEventTimeWindows.of(Time.days(1))) .trigger(ContinuousEventTimeTrigger.of(Time.minutes(1))) .evictor(TimeEvictor.of(Time.seconds(0), true)) .process(new ProcessWindowFunction[IN,OUT,KEY,TimeWindow]{ private var state: MapState[String,Boolean] = _ override def open override def process override def clear(ctx: Context): Unit = { state.clear() } }
Re: 回复: ProcessWindowFunction中如何有效清除state呢
Hi 我觉得你的整个程序能从没有checkpoint开始跑就很奇怪,你们的 value state descriptor里面没有定义default value,那么调用#value() 接口返回的就是null,所以第一次调用 #update 时候还从state里面取值,最后还能跑通就很奇怪。 我建议本地在IDE里面debug看一下吧,可以把clear的条件改一下,不要弄成隔天才清理,可以让本地可以复现问题。 祝好 唐云 From: 守护 <346531...@qq.com> Sent: Tuesday, March 31, 2020 16:31 To: user-zh Subject: 回复: ProcessWindowFunction中如何有效清除state呢 感谢您回复 代码中if(stateDate.equals("") || stateDate.equals(date))的判断逻辑确实能走到pv_st.clear()中,1.最后输出结果时发现pv_st中的状态没有清空,还是累加计算,2.state.clear() 之后,再次获取时,返回值会是null,代码片段里面确实没有对null值的校验,而是直接更新值pv_st.update(pv_st.value() + c_st),是和这个能有关系吗 --原始邮件-- 发件人:"Yun Tang"
?????? ProcessWindowFunction??????????????state??
?? ??if(stateDate.equals("") || stateDate.equals(date))pv_st.clear()1.??pv_st2.state.clear() nullnullpv_st.update(pv_st.value() + c_st) ---- ??:"Yun Tang"
Re: ProcessWindowFunction中如何有效清除state呢
Hi 从代码看 if(stateDate.equals("") || stateDate.equals(date)) 无法判断究竟是从哪里获取到stateDate变量赋值,不清楚你这里里面的判断逻辑是否能生效。 其次,state.clear() 之后,再次获取时,返回值会是null,代码片段里面也没有看出来哪里有对这个的校验。 祝好 唐云 From: 守护 <346531...@qq.com> Sent: Tuesday, March 31, 2020 12:33 To: user-zh Subject: ProcessWindowFunction中如何有效清除state呢 各位好: --版本 FLINK 1.10.0 ON YARN --过程 1.定义一个 .window(TumblingProcessingTimeWindows.of(Time.days(1)))窗口 2.定义一个new Trigger(),.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10))) 3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,业务是每天0点开始算这一天的数据,第二天清空从新计算, --问题 在 new ProcessWindowFunction()中创建了ValueState,想在第二天0点的时候ValueState清空开始重新计算,但是返现ValueState并没有清空,而是叠加前一天的继续计算,这个.clear()方法应该在什么时候加,才能生效呢? --部分代码 .window(TumblingProcessingTimeWindows.of(Time.days(1))) .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10))) .process(new ProcessWindowFunction[(String,String,Long), String, Tuple, TimeWindow] { private var pv_st: ValueState[Long] = _ override def open(parameters: Configuration): Unit = { pv_st = getRuntimeContext.getState[Long](new ValueStateDescriptor[Long]("pv_stCount", classOf[Long])) } override def process(key: Tuple, context: Context, elements: Iterable[(String,String,Long)], out: Collector[String]): Unit = { var c_st = 0 val elementsIterator = elements.iterator // 遍历窗口数据,获取唯一word while (elementsIterator.hasNext) { val ac_name = elementsIterator.next()._2 if(!ac_name.isEmpty ac_name.equals("listentime")){ c_st +=1 } } val time: Date = new Date() val dateFormat: SimpleDateFormat = new SimpleDateFormat("-MM-dd") val date = dateFormat.format(time) // add current pv_st.update(pv_st.value() + c_st) var jsonStr = ""+key.getField(0)+"_"+date+"" // json格式开始 jsonStr += "{"+ "\"yesterday_foreground_play_pv\":\""+pv_st.value()+ "\"}"; //判断逻辑,是否到第二天,如果到第二天状态数据全部清空,重新累加 if(stateDate.equals("") || stateDate.equals(date)){ stateDate=date out.collect(jsonStr) }else{ out.collect(jsonStr) pv_st.clear() stateDate=date } } })
ProcessWindowFunction 问题求助
大神: 报错信息: Error:(73, 27) java: 对于process(com.linkedsee.aiops.CountErrorFunction), 找不到合适的方法 方法 org.apache.flink.streaming.api.datastream.DataStream.process(org.apache.flink.streaming.api.functions.ProcessFunction)不适用 (无法推断类型变量 R (参数不匹配; com.linkedsee.aiops.CountErrorFunction无法转换为org.apache.flink.streaming.api.functions.ProcessFunction)) 方法 org.apache.flink.streaming.api.datastream.DataStream.process(org.apache.flink.streaming.api.functions.ProcessFunction,org.apache.flink.api.common.typeinfo.TypeInformation)不适用 (无法推断类型变量 R (实际参数列表和形式参数列表长度不同)) StreamingJob.java /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.linkedsee.aiops; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema; import java.util.*; /** * Skeleton for a Flink Streaming Job. * * For a tutorial how to write a Flink streaming application, check the * tutorials and examples on the https://flink.apache.org/docs/stable/;>Flink Website. * * To package your application into a JAR file for execution, run * 'mvn clean package' on the command line. * * If you change the name of the main class (with the public static void main(String[] args)) * method, change the respective entry in the POM.xml file (simply search for 'mainClass'). */ public class StreamingJob { public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "xxx:9092"); properties.setProperty("auto.offset.reset", "earliest"); properties.setProperty("group.id", "test"); FlinkKafkaConsumer fConsumer = new FlinkKafkaConsumer<>("aiops_log", new JSONKeyValueDeserializationSchema(false), properties); fConsumer.assignTimestampsAndWatermarks(new AssignerWatermarks()); DataStream dataStream = env .addSource(fConsumer); dataStream .keyBy(new KeySelector() { @Override public Object getKey(ObjectNode objectNode) throws Exception { return objectNode.get("value").get("type"); } }) .timeWindow(Time.seconds(60)); dataStream.process(new CountErrorFunction()); dataStream.print(); /* * Here, you can start creating your execution plan for Flink. * * Start with getting some data from the environment, like * env.readTextFile(textPath); * * then, transform the resulting DataStream using operations * like * .filter() * .flatMap() * .join() * .coGroup() * * and many more. * Have a look at the programming guide for the Java API: * * https://flink.apache.org/docs/latest/apis/streaming/index.html * */ // execute program env.execute("Error Detection"); } } CountErrorFunction.java package com.linkedsee.aiops; import jdk.nashorn.internal.ir.ObjectNode; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; public class CountErrorFunction extends ProcessWindowFunction { @Override public void process(String s,
Re:Re: FLINK 不同 StateBackend ProcessWindowFunction的差别
感谢 唐老师 解答! 在 2020-01-07 19:46:06,"Yun Tang" 写道: >Hi > >使用iterator.remove() 去除state中已经计算过的数据不是一个标准做法,标准的做法应该是 clear掉相应的state [1] >至于为什么使用MemoryStateBackend会去除数据是因为 get 返回的结果是backend中on heap直接存储的对象[2],存在修改的副作用。 >而RocksDB state backend get返回的结果是反序列化的list,而不是RocksDB自身存储的数据 [3],也就不存在修改的副作用了。 > > >[1] >https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377 >[2] >https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapAppendingState.java#L57 >[3] >https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java#L119 > >祝好 >唐云 > > >From: USERNAME >Sent: Tuesday, January 7, 2020 17:54 >To: user-zh@flink.apache.org >Subject: FLINK 不同 StateBackend ProcessWindowFunction的差别 > >各位好! >祝大家新年快乐! > > > > >--版本 >FLINK 1.9.1 ON YARN > > >--过程 >1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口 >2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出 >3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据 >--问题 >new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉, >使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。 >使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。 >这种计算场景有更好的计算方法吗? > > >--部分代码 >final StreamExecutionEnvironment env = >StreamExecutionEnvironment.getExecutionEnvironment(); >env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > >new ProcessWindowFunction{ >public void process(Tuple tuple, Context context, Iterable >elements, Collector out) throws Exception { >for (Iterator iter = elements.iterator(); iter.hasNext(); ) { > >iter.remove(); >} >} > >} > > > > > > >
Re:Re:Re: FLINK 不同 StateBackend ProcessWindowFunction的差别
TTL 好像不支持 TimeCharacteristic.EventTime 方式 在 2020-01-08 14:17:11,"USERNAME" 写道: >我这例子需要通过 在触发器中 TriggerResult.FIRE_AND_PURGE 来清理当前计算窗口的数据,实现增量计算,跟TTL有点区别吧。 > > > > > >在 2020-01-07 19:51:57,"huoguo" 写道: >> >> >>过期数据能通过TTL 设置过期吗? >> >>> 在 2020年1月7日,17:54,USERNAME 写道: >>> >>> 各位好! >>> 祝大家新年快乐! >>> >>> >>> >>> >>> --版本 >>> FLINK 1.9.1 ON YARN >>> >>> >>> --过程 >>> 1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口 >>> 2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出 >>> 3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据 >>> --问题 >>> new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉, >>> 使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。 >>> 使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。 >>> 这种计算场景有更好的计算方法吗? >>> >>> >>> --部分代码 >>> final StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>> >>> >>> new ProcessWindowFunction{ >>> public void process(Tuple tuple, Context context, Iterable >>> elements, Collector out) throws Exception { >>> for (Iterator iter = elements.iterator(); iter.hasNext(); ) { >>> >>> iter.remove(); >>> } >>> } >>> >>> } >>> >>> >>> >>> >>> >>> >>> >>
Re: FLINK 不同 StateBackend ProcessWindowFunction的差别
过期数据能通过TTL 设置过期吗? > 在 2020年1月7日,17:54,USERNAME 写道: > > 各位好! > 祝大家新年快乐! > > > > > --版本 > FLINK 1.9.1 ON YARN > > > --过程 > 1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口 > 2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出 > 3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据 > --问题 > new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉, > 使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。 > 使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。 > 这种计算场景有更好的计算方法吗? > > > --部分代码 > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > > new ProcessWindowFunction{ > public void process(Tuple tuple, Context context, Iterable > elements, Collector out) throws Exception { > for (Iterator iter = elements.iterator(); iter.hasNext(); ) { > > iter.remove(); > } > } > > } > > > > > > >
Re: FLINK 不同 StateBackend ProcessWindowFunction的差别
Hi 使用iterator.remove() 去除state中已经计算过的数据不是一个标准做法,标准的做法应该是 clear掉相应的state [1] 至于为什么使用MemoryStateBackend会去除数据是因为 get 返回的结果是backend中on heap直接存储的对象[2],存在修改的副作用。 而RocksDB state backend get返回的结果是反序列化的list,而不是RocksDB自身存储的数据 [3],也就不存在修改的副作用了。 [1] https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377 [2] https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapAppendingState.java#L57 [3] https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java#L119 祝好 唐云 From: USERNAME Sent: Tuesday, January 7, 2020 17:54 To: user-zh@flink.apache.org Subject: FLINK 不同 StateBackend ProcessWindowFunction的差别 各位好! 祝大家新年快乐! --版本 FLINK 1.9.1 ON YARN --过程 1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口 2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出 3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据 --问题 new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉, 使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。 使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。 这种计算场景有更好的计算方法吗? --部分代码 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); new ProcessWindowFunction{ public void process(Tuple tuple, Context context, Iterable elements, Collector out) throws Exception { for (Iterator iter = elements.iterator(); iter.hasNext(); ) { iter.remove(); } } }
FLINK 不同 StateBackend ProcessWindowFunction的差别
各位好! 祝大家新年快乐! --版本 FLINK 1.9.1 ON YARN --过程 1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口 2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出 3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据 --问题 new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉, 使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。 使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。 这种计算场景有更好的计算方法吗? --部分代码 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); new ProcessWindowFunction{ public void process(Tuple tuple, Context context, Iterable elements, Collector out) throws Exception { for (Iterator iter = elements.iterator(); iter.hasNext(); ) { iter.remove(); } } }
Re: Firing timers on ProcessWindowFunction
Thanks Alexander, Will do. Cheers On Mon, Dec 2, 2019 at 3:23 PM Alexander Fedulov wrote: > *This Message originated outside your organization.* > -- > Hi Avi, > > In this situation I would propose to step back and use a lower level API > - ProcessFunction. You can put your window elements into the Flink-managed > List state and handle clean-up/triggering and periodic state mutations > exactly as needed by implementing some additional timers logic. > > Best regards, > > -- > > Alexander Fedulov | Solutions Architect > > +49 1514 6265796 > > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward > <https://flink-forward.org/> > - The Apache Flink Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Tony) Cheng > > > > On Mon, Dec 2, 2019 at 1:16 PM Avi Levi wrote: > >> I think the only way to do this is to add keyed operator down the stream >> that will hold the global state. not ideal but I don't see any other option >> >> On Mon, Dec 2, 2019 at 1:43 PM Avi Levi wrote: >> >>> Hi Vino, >>> I have a global state that I need to mutate every X hours (e.g clean >>> that state or update its value) . I thought that there might be an option >>> to set a timer user the timerService with it's own time interval detached >>> from the window interval interval . >>> >>> On Mon, Dec 2, 2019 at 10:59 AM vino yang wrote: >>> >>>> *This Message originated outside your organization.* >>>> -- >>>> Hi Avi, >>>> >>>> Firstly, let's clarify that the "timer" you said is the timer of the >>>> window? Or a timer you want to register to trigger some action? >>>> >>>> Best, >>>> Vino >>>> >>>> >>>> Avi Levi 于2019年12月2日周一 下午4:11写道: >>>> >>>>> Hi, >>>>> Is there a way to fire timer in a ProcessWindowFunction ? I would like >>>>> to mutate the global state on a timely basis. >>>>> >>>>>
Re: Firing timers on ProcessWindowFunction
I think the only way to do this is to add keyed operator down the stream that will hold the global state. not ideal but I don't see any other option On Mon, Dec 2, 2019 at 1:43 PM Avi Levi wrote: > Hi Vino, > I have a global state that I need to mutate every X hours (e.g clean that > state or update its value) . I thought that there might be an option to set > a timer user the timerService with it's own time interval detached from the > window interval interval . > > On Mon, Dec 2, 2019 at 10:59 AM vino yang wrote: > >> *This Message originated outside your organization.* >> -- >> Hi Avi, >> >> Firstly, let's clarify that the "timer" you said is the timer of the >> window? Or a timer you want to register to trigger some action? >> >> Best, >> Vino >> >> >> Avi Levi 于2019年12月2日周一 下午4:11写道: >> >>> Hi, >>> Is there a way to fire timer in a ProcessWindowFunction ? I would like >>> to mutate the global state on a timely basis. >>> >>>
Re: Firing timers on ProcessWindowFunction
Hi Vino, I have a global state that I need to mutate every X hours (e.g clean that state or update its value) . I thought that there might be an option to set a timer user the timerService with it's own time interval detached from the window interval interval . On Mon, Dec 2, 2019 at 10:59 AM vino yang wrote: > *This Message originated outside your organization.* > -- > Hi Avi, > > Firstly, let's clarify that the "timer" you said is the timer of the > window? Or a timer you want to register to trigger some action? > > Best, > Vino > > > Avi Levi 于2019年12月2日周一 下午4:11写道: > >> Hi, >> Is there a way to fire timer in a ProcessWindowFunction ? I would like to >> mutate the global state on a timely basis. >> >>
Re: Firing timers on ProcessWindowFunction
Hi Avi, Firstly, let's clarify that the "timer" you said is the timer of the window? Or a timer you want to register to trigger some action? Best, Vino Avi Levi 于2019年12月2日周一 下午4:11写道: > Hi, > Is there a way to fire timer in a ProcessWindowFunction ? I would like to > mutate the global state on a timely basis. > >
Firing timers on ProcessWindowFunction
Hi, Is there a way to fire timer in a ProcessWindowFunction ? I would like to mutate the global state on a timely basis.
Re: Testing AggregateFunction() and ProcessWindowFunction() on KeyedDataStream
Hi Michael, >From the WindowTranslationTest, I did not see anything about the initialization of mini-cluster. Here we are testing operator, it seems operator test harness has provided the necessary infrastructure. You can try to see if there is anything missed. Best, Vino Nguyen, Michael 于2019年10月28日周一 下午4:51写道: > Hi Vino, > > > > This is a great example – thank you! > > > > It looks like I need to instantiate a StreamExecutionEnvironment to order > to get my OneInputStreamOperator. Would I need to setup a local > flinkCluster using MiniClusterWithClientResource in order to use > StreamExecutionEnvironment? > > > > > > Best, > > Michael > > > > > > *From: *vino yang > *Date: *Monday, October 28, 2019 at 1:32 AM > *To: *Michael Nguyen > *Cc: *"user@flink.apache.org" > *Subject: *Re: Testing AggregateFunction() and ProcessWindowFunction() on > KeyedDataStream > > > > *[External]* > > > > Hi Michael, > > > > You may need to know `KeyedOneInputStreamOperatorTestHarness` test class. > > > > You can consider > `WindowTranslationTest#testAggregateWithWindowFunctionEventTime` or > `WindowTranslationTest#testAggregateWithWindowFunctionProcessingTime`[1](both > of them call `processElementAndEnsureOutput`) as a example. > > > > [1]: > https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java#L676 > <https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Fmaster%2Fflink-streaming-java%2Fsrc%2Ftest%2Fjava%2Forg%2Fapache%2Fflink%2Fstreaming%2Fruntime%2Foperators%2Fwindowing%2FWindowTranslationTest.java%23L676=02%7C01%7CMichael.Nguyen79%40t-mobile.com%7C5a3f25ed6f29450bc95508d75b81627c%7Cbe0f980bdd994b19bd7bbc71a09b026c%7C0%7C0%7C637078483575275538=AfjOTQGV8OFR9azcGzxpwXUrCRptRiYwVAVk7EYlNBY%3D=0> > > > > Best, > > Vino > > > > Nguyen, Michael 于2019年10月28日周一 下午3:18写道: > > Hello everbody, > > > > Has anyone tried testing AggregateFunction() and ProcessWindowFunction() > on a KeyedDataStream? I have reviewed the testing page on Flink’s official > website ( > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html > <https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fdev%2Fstream%2Ftesting.html=02%7C01%7CMichael.Nguyen79%40t-mobile.com%7C5a3f25ed6f29450bc95508d75b81627c%7Cbe0f980bdd994b19bd7bbc71a09b026c%7C0%7C0%7C637078483575275538=aeedhITUuaNaBpt92scqzi0EcAqyyuVGVHKCM7euJtc%3D=0>) > and I am not quite sure how I could utilize these two functions in an > .aggregate() operator for my testing. > > > > Here’s how I am using the AggregateFunction (EventCountAggregate()) and > ProcessWindowFunction (CalculateWindowTotal()) in my Flink job: > > DataStream> ec2EventsAggregate = > ec2Events > .keyBy(t -> t.f0) > .timeWindow(Time.*minutes*(30)) > .aggregate(new EventCountAggregate(), new > CalculateWindowTotal()) > .name("EC2 creation interval count"); > > > > > > EventCountAggregate() is counting the each element in ec2Events datastream. > > > > CalculateWindowTotal() takes the timestamp of each 30 minute window and > correlates it to the number of elements that has been counted so far for > the window which returns a Tuple2 containg the end timestamp and the count > of elements. > > > > > > Thanks, > > Michael > >
Re: Testing AggregateFunction() and ProcessWindowFunction() on KeyedDataStream
Hi Vino, This is a great example – thank you! It looks like I need to instantiate a StreamExecutionEnvironment to order to get my OneInputStreamOperator. Would I need to setup a local flinkCluster using MiniClusterWithClientResource in order to use StreamExecutionEnvironment? Best, Michael From: vino yang Date: Monday, October 28, 2019 at 1:32 AM To: Michael Nguyen Cc: "user@flink.apache.org" Subject: Re: Testing AggregateFunction() and ProcessWindowFunction() on KeyedDataStream [External] Hi Michael, You may need to know `KeyedOneInputStreamOperatorTestHarness` test class. You can consider `WindowTranslationTest#testAggregateWithWindowFunctionEventTime` or `WindowTranslationTest#testAggregateWithWindowFunctionProcessingTime`[1](both of them call `processElementAndEnsureOutput`) as a example. [1]: https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java#L676<https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Fmaster%2Fflink-streaming-java%2Fsrc%2Ftest%2Fjava%2Forg%2Fapache%2Fflink%2Fstreaming%2Fruntime%2Foperators%2Fwindowing%2FWindowTranslationTest.java%23L676=02%7C01%7CMichael.Nguyen79%40t-mobile.com%7C5a3f25ed6f29450bc95508d75b81627c%7Cbe0f980bdd994b19bd7bbc71a09b026c%7C0%7C0%7C637078483575275538=AfjOTQGV8OFR9azcGzxpwXUrCRptRiYwVAVk7EYlNBY%3D=0> Best, Vino Nguyen, Michael mailto:michael.nguye...@t-mobile.com>> 于2019年10月28日周一 下午3:18写道: Hello everbody, Has anyone tried testing AggregateFunction() and ProcessWindowFunction() on a KeyedDataStream? I have reviewed the testing page on Flink’s official website (https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html<https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fdev%2Fstream%2Ftesting.html=02%7C01%7CMichael.Nguyen79%40t-mobile.com%7C5a3f25ed6f29450bc95508d75b81627c%7Cbe0f980bdd994b19bd7bbc71a09b026c%7C0%7C0%7C637078483575275538=aeedhITUuaNaBpt92scqzi0EcAqyyuVGVHKCM7euJtc%3D=0>) and I am not quite sure how I could utilize these two functions in an .aggregate() operator for my testing. Here’s how I am using the AggregateFunction (EventCountAggregate()) and ProcessWindowFunction (CalculateWindowTotal()) in my Flink job: DataStream> ec2EventsAggregate = ec2Events .keyBy(t -> t.f0) .timeWindow(Time.minutes(30)) .aggregate(new EventCountAggregate(), new CalculateWindowTotal()) .name("EC2 creation interval count"); EventCountAggregate() is counting the each element in ec2Events datastream. CalculateWindowTotal() takes the timestamp of each 30 minute window and correlates it to the number of elements that has been counted so far for the window which returns a Tuple2 containg the end timestamp and the count of elements. Thanks, Michael
Re: Testing AggregateFunction() and ProcessWindowFunction() on KeyedDataStream
Hi Michael, You may need to know `KeyedOneInputStreamOperatorTestHarness` test class. You can consider `WindowTranslationTest#testAggregateWithWindowFunctionEventTime` or `WindowTranslationTest#testAggregateWithWindowFunctionProcessingTime`[1](both of them call `processElementAndEnsureOutput`) as a example. [1]: https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java#L676 Best, Vino Nguyen, Michael 于2019年10月28日周一 下午3:18写道: > Hello everbody, > > > > Has anyone tried testing AggregateFunction() and ProcessWindowFunction() > on a KeyedDataStream? I have reviewed the testing page on Flink’s official > website ( > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html) > and I am not quite sure how I could utilize these two functions in an > .aggregate() operator for my testing. > > > > Here’s how I am using the AggregateFunction (EventCountAggregate()) and > ProcessWindowFunction (CalculateWindowTotal()) in my Flink job: > > DataStream> ec2EventsAggregate = > ec2Events > .keyBy(t -> t.f0) > .timeWindow(Time.*minutes*(30)) > .aggregate(new EventCountAggregate(), new > CalculateWindowTotal()) > .name("EC2 creation interval count"); > > > > > > EventCountAggregate() is counting the each element in ec2Events datastream. > > > > CalculateWindowTotal() takes the timestamp of each 30 minute window and > correlates it to the number of elements that has been counted so far for > the window which returns a Tuple2 containg the end timestamp and the count > of elements. > > > > > > Thanks, > > Michael >
Testing AggregateFunction() and ProcessWindowFunction() on KeyedDataStream
Hello everbody, Has anyone tried testing AggregateFunction() and ProcessWindowFunction() on a KeyedDataStream? I have reviewed the testing page on Flink’s official website (https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html) and I am not quite sure how I could utilize these two functions in an .aggregate() operator for my testing. Here’s how I am using the AggregateFunction (EventCountAggregate()) and ProcessWindowFunction (CalculateWindowTotal()) in my Flink job: DataStream> ec2EventsAggregate = ec2Events .keyBy(t -> t.f0) .timeWindow(Time.minutes(30)) .aggregate(new EventCountAggregate(), new CalculateWindowTotal()) .name("EC2 creation interval count"); EventCountAggregate() is counting the each element in ec2Events datastream. CalculateWindowTotal() takes the timestamp of each 30 minute window and correlates it to the number of elements that has been counted so far for the window which returns a Tuple2 containg the end timestamp and the count of elements. Thanks, Michael
Re: Increasing trend for state size of keyed stream using ProcessWindowFunction with ProcessingTimeSessionWindows
Hi, I actually created Jira issue before posting it to mailing list. Today I added steps to reproduce with tests outcome of different scenarios to the repository. Jira issue: https://issues.apache.org/jira/browse/FLINK-14197 Repository: https://github.com/loliver1234/flink-process-window-function With best regards Oliwer On 02.10.2019 12:05, Fabian Hueske wrote: Hi Oliwer, I think you are right. There seems to be something going wrong. Just to clarify, you are sure that the growing state size is caused by the window operator? From your description I assume that the state size does not depend (solely) on the number of distinct keys. Otherwise, the state size would stop growing at some point. This would be a hint that every window leaves some state behind. AFAIK, processing time session windows are not very common. There might be a bug in the implementation. Could you create a Jira with a description of the problem? It would be great, if you could provide a reproducible example with a data generator source. Thank you, Fabian Am Di., 1. Okt. 2019 um 11:18 Uhr schrieb Oliwer Kostera mailto:o.kost...@adbglobal.com>>: Hi, I'm no sure what you mean by windowState.clear(). As far as I understand you correctly it's a windowState from ProcessWindowFunction Context which is KeyedStateStore. KeyedStateStore is managing registered keyed states that I don't have, so without a descriptor I can't access any clear() method. There is no state that I manage explicitly as you can see here: https://github.com/loliver1234/flink-process-window-function/blob/master/src/main/java/personal/kostera/functions/CustomProcessWindowFunction.java With best regards Oliwer On 01.10.2019 07:48, Congxian Qiu wrote: Hi Oliwer, From the description, Seems the state didn't be cleared, maybe you could check how many {{windowState.clear()}} was triggered in {{WindowOperator#processElement}}, and try to figure it out why the state did not be cleared. Best, Congxian Oliwer Kostera mailto:o.kost...@adbglobal.com>> 于2019年9月27日周五 下午4:14写道: Hi all, I'm using ProcessWindowFunction in a keyed stream with the following definition: final SingleOutputStreamOperator processWindowFunctionStream = keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100))) .process(new CustomProcessWindowFunction()) .uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID) .name("Process window function"); My checkpointing configuration is set to use RocksDB state backend with incremental checkpointing and EXACTLY_ONCE mode. In a runtime I noticed that even though data ingestion is static - same keys and frequency of messages the size of the process window operator keeps increasing. I tried to reproduce it with minimal similar setup here: https://github.com/loliver1234/flink-process-window-function and was successful to do so. Testing conditions: * RabbitMQ source with Exactly-once guarantee and 65k prefetch count * RabbitMQ sink to collect messages * Simple ProcessWindowFunction that only pass messages through * Stream time characteristic set to TimeCharacteristic.ProcessingTime Testing scenario: * Start flink job and check initial state size - State Size: 127 KB * Start sending messages, 1000 same unique keys every 1s (they are not falling into defined time window gap set to 100ms, each message should create new window) * State of the process window operator keeps increasing - after 1mln messages state ended up to be around 2mb * Stop sending messages and wait till rabbit queue is fully consumed and few checkpoints go by * Was expected to see state size to decrease to base value but it stayed at 2mb * Continue to send messages with the same keys and state kept increasing trend. What I checked: * Registration and deregistration of timers set for time windows - each registration matched its deregistration * Checked that in fact there are no window merges * Tried custom Trigger disabling window merges and setting onProcessingTime trigger to TriggerResult.FIRE_AND_PURGE - same state behavior Tested with: * Local Flink Mini Cluster running from IDE * Flink ha standalone cluster run in docker On staging environment, we noticed that state for that operator keeps increasing indefinitely, after some months reaching even 1,5gb for 100k unique keys With best regards Oliwer [X] adbglobal.com<https://www.adbglobal.com> This message (including any attachments) may contain confidential, proprietary, privileged and/or private information. The information is intended for the use of the individual or entity designated above. If you are not the intended recipient of this message, please notify the sender immediately, and delete the message and any attachments. Any disclosure, reproduction, distribution or other use of this message or any attachments by an individual or ent
Re: Increasing trend for state size of keyed stream using ProcessWindowFunction with ProcessingTimeSessionWindows
Hi Oliwer, I think you are right. There seems to be something going wrong. Just to clarify, you are sure that the growing state size is caused by the window operator? >From your description I assume that the state size does not depend (solely) on the number of distinct keys. Otherwise, the state size would stop growing at some point. This would be a hint that every window leaves some state behind. AFAIK, processing time session windows are not very common. There might be a bug in the implementation. Could you create a Jira with a description of the problem? It would be great, if you could provide a reproducible example with a data generator source. Thank you, Fabian Am Di., 1. Okt. 2019 um 11:18 Uhr schrieb Oliwer Kostera < o.kost...@adbglobal.com>: > Hi, > > I'm no sure what you mean by windowState.clear(). As far as I understand > you correctly it's a windowState from ProcessWindowFunction Context which > is KeyedStateStore. KeyedStateStore is managing registered keyed states > that I don't have, so without a descriptor I can't access any clear() > method. There is no state that I manage explicitly as you can see here: > https://github.com/loliver1234/flink-process-window-function/blob/master/src/main/java/personal/kostera/functions/CustomProcessWindowFunction.java > > With best regards > > Oliwer > On 01.10.2019 07:48, Congxian Qiu wrote: > > Hi Oliwer, > > From the description, Seems the state didn't be cleared, maybe you could > check how many {{windowState.clear()}} was triggered in > {{WindowOperator#processElement}}, and try to figure it out why the state > did not be cleared. > > Best, > Congxian > > > Oliwer Kostera 于2019年9月27日周五 下午4:14写道: > >> Hi all, >> >> >> I'm using *ProcessWindowFunction* in a keyed stream with the following >> definition: >> >> final SingleOutputStreamOperator processWindowFunctionStream = >> >> keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100))) >> .process(new CustomProcessWindowFunction()) >> .uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID) >> .name("Process window function"); >> >> My checkpointing configuration is set to use RocksDB state backend with >> incremental checkpointing and EXACTLY_ONCE mode. >> >> In a runtime I noticed that even though data ingestion is static - same >> keys and frequency of messages the size of the process window operator >> keeps increasing. I tried to reproduce it with minimal similar setup here: >> https://github.com/loliver1234/flink-process-window-function and was >> successful to do so. >> >> Testing conditions: >> >>- RabbitMQ source with Exactly-once guarantee and 65k prefetch count >>- RabbitMQ sink to collect messages >>- Simple ProcessWindowFunction that only pass messages through >>- Stream time characteristic set to TimeCharacteristic.ProcessingTime >> >> Testing scenario: >> >>- Start flink job and check initial state size - State Size: 127 KB >>- Start sending messages, 1000 same unique keys every 1s (they are >>not falling into defined time window gap set to 100ms, each message should >>create new window) >>- State of the process window operator keeps increasing - after 1mln >>messages state ended up to be around 2mb >>- Stop sending messages and wait till rabbit queue is fully consumed >>and few checkpoints go by >>- Was expected to see state size to decrease to base value but it >>stayed at 2mb >>- Continue to send messages with the same keys and state kept >>increasing trend. >> >> What I checked: >> >>- Registration and deregistration of timers set for time windows - >>each registration matched its deregistration >>- Checked that in fact there are no window merges >>- Tried custom Trigger disabling window merges and setting >>onProcessingTime trigger to TriggerResult.FIRE_AND_PURGE - same state >>behavior >> >> Tested with: >> >>- Local Flink Mini Cluster running from IDE >>- Flink ha standalone cluster run in docker >> >> On staging environment, we noticed that state for that operator keeps >> increasing indefinitely, after some months reaching even 1,5gb for 100k >> unique keys >> >> With best regards >> >> Oliwer >> adbglobal.com <https://www.adbglobal.com> >> >> *This message (including any attachments) may contain confidential, >> proprietary, privileged and/or private information. The information is >> intended for t
Re: Increasing trend for state size of keyed stream using ProcessWindowFunction with ProcessingTimeSessionWindows
Hi, I'm no sure what you mean by windowState.clear(). As far as I understand you correctly it's a windowState from ProcessWindowFunction Context which is KeyedStateStore. KeyedStateStore is managing registered keyed states that I don't have, so without a descriptor I can't access any clear() method. There is no state that I manage explicitly as you can see here: https://github.com/loliver1234/flink-process-window-function/blob/master/src/main/java/personal/kostera/functions/CustomProcessWindowFunction.java With best regards Oliwer On 01.10.2019 07:48, Congxian Qiu wrote: Hi Oliwer, From the description, Seems the state didn't be cleared, maybe you could check how many {{windowState.clear()}} was triggered in {{WindowOperator#processElement}}, and try to figure it out why the state did not be cleared. Best, Congxian Oliwer Kostera mailto:o.kost...@adbglobal.com>> 于2019年9月27日周五 下午4:14写道: Hi all, I'm using ProcessWindowFunction in a keyed stream with the following definition: final SingleOutputStreamOperator processWindowFunctionStream = keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100))) .process(new CustomProcessWindowFunction()) .uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID) .name("Process window function"); My checkpointing configuration is set to use RocksDB state backend with incremental checkpointing and EXACTLY_ONCE mode. In a runtime I noticed that even though data ingestion is static - same keys and frequency of messages the size of the process window operator keeps increasing. I tried to reproduce it with minimal similar setup here: https://github.com/loliver1234/flink-process-window-function and was successful to do so. Testing conditions: * RabbitMQ source with Exactly-once guarantee and 65k prefetch count * RabbitMQ sink to collect messages * Simple ProcessWindowFunction that only pass messages through * Stream time characteristic set to TimeCharacteristic.ProcessingTime Testing scenario: * Start flink job and check initial state size - State Size: 127 KB * Start sending messages, 1000 same unique keys every 1s (they are not falling into defined time window gap set to 100ms, each message should create new window) * State of the process window operator keeps increasing - after 1mln messages state ended up to be around 2mb * Stop sending messages and wait till rabbit queue is fully consumed and few checkpoints go by * Was expected to see state size to decrease to base value but it stayed at 2mb * Continue to send messages with the same keys and state kept increasing trend. What I checked: * Registration and deregistration of timers set for time windows - each registration matched its deregistration * Checked that in fact there are no window merges * Tried custom Trigger disabling window merges and setting onProcessingTime trigger to TriggerResult.FIRE_AND_PURGE - same state behavior Tested with: * Local Flink Mini Cluster running from IDE * Flink ha standalone cluster run in docker On staging environment, we noticed that state for that operator keeps increasing indefinitely, after some months reaching even 1,5gb for 100k unique keys With best regards Oliwer [https://www.adbglobal.com/wp-content/uploads/adb.png] adbglobal.com<https://www.adbglobal.com> This message (including any attachments) may contain confidential, proprietary, privileged and/or private information. The information is intended for the use of the individual or entity designated above. If you are not the intended recipient of this message, please notify the sender immediately, and delete the message and any attachments. Any disclosure, reproduction, distribution or other use of this message or any attachments by an individual or entity other than the intended recipient is STRICTLY PROHIBITED. Please note that ADB protects your privacy. Any personal information we collect from you is used in accordance with our Privacy Policy<https://www.adbglobal.com/privacy-policy/> and in compliance with applicable European data protection law (Regulation (EU) 2016/679, General Data Protection Regulation) and other statutory provisions.
Re: Increasing trend for state size of keyed stream using ProcessWindowFunction with ProcessingTimeSessionWindows
Hi Oliwer, >From the description, Seems the state didn't be cleared, maybe you could check how many {{windowState.clear()}} was triggered in {{WindowOperator#processElement}}, and try to figure it out why the state did not be cleared. Best, Congxian Oliwer Kostera 于2019年9月27日周五 下午4:14写道: > Hi all, > > > I'm using *ProcessWindowFunction* in a keyed stream with the following > definition: > > final SingleOutputStreamOperator processWindowFunctionStream = > > keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100))) > .process(new CustomProcessWindowFunction()) > .uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID) > .name("Process window function"); > > My checkpointing configuration is set to use RocksDB state backend with > incremental checkpointing and EXACTLY_ONCE mode. > > In a runtime I noticed that even though data ingestion is static - same > keys and frequency of messages the size of the process window operator > keeps increasing. I tried to reproduce it with minimal similar setup here: > https://github.com/loliver1234/flink-process-window-function and was > successful to do so. > > Testing conditions: > >- RabbitMQ source with Exactly-once guarantee and 65k prefetch count >- RabbitMQ sink to collect messages >- Simple ProcessWindowFunction that only pass messages through >- Stream time characteristic set to TimeCharacteristic.ProcessingTime > > Testing scenario: > >- Start flink job and check initial state size - State Size: 127 KB >- Start sending messages, 1000 same unique keys every 1s (they are not >falling into defined time window gap set to 100ms, each message should >create new window) >- State of the process window operator keeps increasing - after 1mln >messages state ended up to be around 2mb >- Stop sending messages and wait till rabbit queue is fully consumed >and few checkpoints go by >- Was expected to see state size to decrease to base value but it >stayed at 2mb >- Continue to send messages with the same keys and state kept >increasing trend. > > What I checked: > >- Registration and deregistration of timers set for time windows - >each registration matched its deregistration >- Checked that in fact there are no window merges >- Tried custom Trigger disabling window merges and setting >onProcessingTime trigger to TriggerResult.FIRE_AND_PURGE - same state >behavior > > Tested with: > >- Local Flink Mini Cluster running from IDE >- Flink ha standalone cluster run in docker > > On staging environment, we noticed that state for that operator keeps > increasing indefinitely, after some months reaching even 1,5gb for 100k > unique keys > > With best regards > > Oliwer > adbglobal.com <https://www.adbglobal.com> > > *This message (including any attachments) may contain confidential, > proprietary, privileged and/or private information. The information is > intended for the use of the individual or entity designated above. If you > are not the intended recipient of this message, please notify the sender > immediately, and delete the message and any attachments. Any disclosure, > reproduction, distribution or other use of this message or any attachments > by an individual or entity other than the intended recipient is STRICTLY > PROHIBITED.* > > *Please note that ADB protects your privacy. Any personal information we > collect from you is used in accordance with our Privacy Policy > <https://www.adbglobal.com/privacy-policy/> and in compliance with > applicable European data protection law (Regulation (EU) 2016/679, General > Data Protection Regulation) and other statutory provisions. * >
Increasing trend for state size of keyed stream using ProcessWindowFunction with ProcessingTimeSessionWindows
Hi all, I'm using ProcessWindowFunction in a keyed stream with the following definition: final SingleOutputStreamOperator processWindowFunctionStream = keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100))) .process(new CustomProcessWindowFunction()) .uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID) .name("Process window function"); My checkpointing configuration is set to use RocksDB state backend with incremental checkpointing and EXACTLY_ONCE mode. In a runtime I noticed that even though data ingestion is static - same keys and frequency of messages the size of the process window operator keeps increasing. I tried to reproduce it with minimal similar setup here: https://github.com/loliver1234/flink-process-window-function and was successful to do so. Testing conditions: * RabbitMQ source with Exactly-once guarantee and 65k prefetch count * RabbitMQ sink to collect messages * Simple ProcessWindowFunction that only pass messages through * Stream time characteristic set to TimeCharacteristic.ProcessingTime Testing scenario: * Start flink job and check initial state size - State Size: 127 KB * Start sending messages, 1000 same unique keys every 1s (they are not falling into defined time window gap set to 100ms, each message should create new window) * State of the process window operator keeps increasing - after 1mln messages state ended up to be around 2mb * Stop sending messages and wait till rabbit queue is fully consumed and few checkpoints go by * Was expected to see state size to decrease to base value but it stayed at 2mb * Continue to send messages with the same keys and state kept increasing trend. What I checked: * Registration and deregistration of timers set for time windows - each registration matched its deregistration * Checked that in fact there are no window merges * Tried custom Trigger disabling window merges and setting onProcessingTime trigger to TriggerResult.FIRE_AND_PURGE - same state behavior Tested with: * Local Flink Mini Cluster running from IDE * Flink ha standalone cluster run in docker On staging environment, we noticed that state for that operator keeps increasing indefinitely, after some months reaching even 1,5gb for 100k unique keys With best regards Oliwer [https://www.adbglobal.com/wp-content/uploads/adb.png] adbglobal.com<https://www.adbglobal.com> This message (including any attachments) may contain confidential, proprietary, privileged and/or private information. The information is intended for the use of the individual or entity designated above. If you are not the intended recipient of this message, please notify the sender immediately, and delete the message and any attachments. Any disclosure, reproduction, distribution or other use of this message or any attachments by an individual or entity other than the intended recipient is STRICTLY PROHIBITED. Please note that ADB protects your privacy. Any personal information we collect from you is used in accordance with our Privacy Policy<https://www.adbglobal.com/privacy-policy/> and in compliance with applicable European data protection law (Regulation (EU) 2016/679, General Data Protection Regulation) and other statutory provisions.
Re: Table API and ProcessWindowFunction
Hi Flavio, I think the reason that we don't have interfaces like EventTimeObject and ProcessingTimeObject is we don't want to define time attributes anywhere. It is considered to define your time attributes in the source. If we add an interface like EventTimeObject and ProcessingTimeObject in Flink, it may bring some other problems like should we generate time attributes anywhere once the object extends EventTimeObject and ProcessingTimeObject. The object may exist in a source, aggregate or even a sink. However, I think it's a good idea to add such logic in your own code. For example, you can define a user-defined source which can just extract time attributes from EventTimeObject and ProcessingTimeObject, similar to the examples in the exercises. Best, Hequn On Thu, Jul 11, 2019 at 4:36 PM Flavio Pompermaier wrote: > Only one proposal here: many times it happens that when working with > streaming sources you need to define which field is the processing/row. > Right now you could define the processing or event time field > implementingthe DefinedProctimeAttribute or DefinedRowtimeAttribute at > source. But this is only helpful if you use SQL API..with TableFunctions > for example you don't have a way to get the proc/row field easily. > Also in the Flink exercises [1] you use aPojo where you have to implement > a method getEventTime() to retrieve the row time field. > > So, why not declaring 2 general interfaces like EventTimeObject and > ProcessingTimeObject so I can declare my objects implementing those > interfaces and I can get the fields I need easily? > > Best, > Flavio > > [1] > https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/datatypes/TaxiRide.java > > On Thu, Jul 11, 2019 at 10:01 AM Flavio Pompermaier > wrote: > >> Thanks Hequn, I'll give it a try! >> >> Best, Flavio >> >> On Thu, Jul 11, 2019 at 3:38 AM Hequn Cheng wrote: >> >>> Hi, >>> >>> > Can you provide a pseudo-code example of how to implement this? >>> Processing time >>> If you use a TumblingProcessingTimeWindows.of(Time.seconds(1)), for each >>> record, you get the timestamp from System.currentTimeMillis(), say t, and >>> w_start = TimeWindow.getWindowStartWithOffset(t, 0, 1000), and w_end = >>> w_start + 1000. >>> >>> Event time >>> If you use a TumblingEventTimeWindows.of(Time.seconds(1)), for each >>> record, get the timestamp from the corresponding timestamp field, say t, >>> and get w_start and w_end same as above. >>> >>> More examples can be found in TimeWindowTest[1]. >>> >>> Best, Hequn >>> >>> [1] >>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java >>> >>> >>> On Wed, Jul 10, 2019 at 8:46 PM Flavio Pompermaier >>> wrote: >>> >>>> The problem with the LATERAL JOIN (via >>>> a LookupableTableSource+TableFunction because I need to call that function >>>> using the userId a a parameter) is that I cannot know the window >>>> start/end..to me it's not clear how to get that from >>>> TimeWindow.getWindowStartWithOffset(timestamp, offset, windowSize)... >>>> Can you provide a pseudo-code example of how to implement this? >>>> >>>> On Tue, Jul 9, 2019 at 4:15 AM Hequn Cheng >>>> wrote: >>>> >>>>> Hi Flavio, >>>>> >>>>> Thanks for your information. >>>>> >>>>> From your description, it seems that you only use the window to get >>>>> the start and end time. There are no aggregations happen. If this is the >>>>> case, you can get the start and end time by yourself(the >>>>> `TimeWindow.getWindowStartWithOffset()` shows how to get window start >>>>> according to the timestamp). To be more specific, if you use processing >>>>> time, you can get your timestamp with System.currentTimeMillis(), and then >>>>> use it to get the window start and end >>>>> with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get >>>>> the timestamp from the rowtime field. >>>>> >>>>> With the start and end time, you can then perform LATERAL JOIN to >>>>> enrich the information. You can add a cache in your table function to >>>>> avoid >>>>> frequent contacting with the REST endpoint. >>>>> >>>>> Best, Hequn
Re: Table API and ProcessWindowFunction
Only one proposal here: many times it happens that when working with streaming sources you need to define which field is the processing/row. Right now you could define the processing or event time field implementingthe DefinedProctimeAttribute or DefinedRowtimeAttribute at source. But this is only helpful if you use SQL API..with TableFunctions for example you don't have a way to get the proc/row field easily. Also in the Flink exercises [1] you use aPojo where you have to implement a method getEventTime() to retrieve the row time field. So, why not declaring 2 general interfaces like EventTimeObject and ProcessingTimeObject so I can declare my objects implementing those interfaces and I can get the fields I need easily? Best, Flavio [1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/datatypes/TaxiRide.java On Thu, Jul 11, 2019 at 10:01 AM Flavio Pompermaier wrote: > Thanks Hequn, I'll give it a try! > > Best, Flavio > > On Thu, Jul 11, 2019 at 3:38 AM Hequn Cheng wrote: > >> Hi, >> >> > Can you provide a pseudo-code example of how to implement this? >> Processing time >> If you use a TumblingProcessingTimeWindows.of(Time.seconds(1)), for each >> record, you get the timestamp from System.currentTimeMillis(), say t, and >> w_start = TimeWindow.getWindowStartWithOffset(t, 0, 1000), and w_end = >> w_start + 1000. >> >> Event time >> If you use a TumblingEventTimeWindows.of(Time.seconds(1)), for each >> record, get the timestamp from the corresponding timestamp field, say t, >> and get w_start and w_end same as above. >> >> More examples can be found in TimeWindowTest[1]. >> >> Best, Hequn >> >> [1] >> https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java >> >> >> On Wed, Jul 10, 2019 at 8:46 PM Flavio Pompermaier >> wrote: >> >>> The problem with the LATERAL JOIN (via >>> a LookupableTableSource+TableFunction because I need to call that function >>> using the userId a a parameter) is that I cannot know the window >>> start/end..to me it's not clear how to get that from >>> TimeWindow.getWindowStartWithOffset(timestamp, offset, windowSize)... >>> Can you provide a pseudo-code example of how to implement this? >>> >>> On Tue, Jul 9, 2019 at 4:15 AM Hequn Cheng wrote: >>> >>>> Hi Flavio, >>>> >>>> Thanks for your information. >>>> >>>> From your description, it seems that you only use the window to get the >>>> start and end time. There are no aggregations happen. If this is the case, >>>> you can get the start and end time by yourself(the >>>> `TimeWindow.getWindowStartWithOffset()` shows how to get window start >>>> according to the timestamp). To be more specific, if you use processing >>>> time, you can get your timestamp with System.currentTimeMillis(), and then >>>> use it to get the window start and end >>>> with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get >>>> the timestamp from the rowtime field. >>>> >>>> With the start and end time, you can then perform LATERAL JOIN to >>>> enrich the information. You can add a cache in your table function to avoid >>>> frequent contacting with the REST endpoint. >>>> >>>> Best, Hequn >>>> >>>> >>>> On Mon, Jul 8, 2019 at 10:46 PM Flavio Pompermaier < >>>> pomperma...@okkam.it> wrote: >>>> >>>>> Hi Hequn, thanks for your answer. >>>>> What I'm trying to do is to read a stream of events that basically >>>>> contains a UserId field and, every X minutes (i.e. using a Time Window) >>>>> and >>>>> for each different UserId key, query 3 different REST services to enrich >>>>> my >>>>> POJOs*. >>>>> For the moment what I do is to use a ProcessWindowFunction after the >>>>> .keyBy().window() as shown in the previous mail example to contact those >>>>> 3 >>>>> services and enrich my object. >>>>> >>>>> However I don't like this solution because I'd like to use Flink to >>>>> it's full potential so I'd like to enrich my object using LATERAL TABLEs >>>>> or >>>>> ASYNC IO.. >>>>> The main problem I'm facing right now is that I can't find a way to >>>>>
Re: Table API and ProcessWindowFunction
Thanks Hequn, I'll give it a try! Best, Flavio On Thu, Jul 11, 2019 at 3:38 AM Hequn Cheng wrote: > Hi, > > > Can you provide a pseudo-code example of how to implement this? > Processing time > If you use a TumblingProcessingTimeWindows.of(Time.seconds(1)), for each > record, you get the timestamp from System.currentTimeMillis(), say t, and > w_start = TimeWindow.getWindowStartWithOffset(t, 0, 1000), and w_end = > w_start + 1000. > > Event time > If you use a TumblingEventTimeWindows.of(Time.seconds(1)), for each > record, get the timestamp from the corresponding timestamp field, say t, > and get w_start and w_end same as above. > > More examples can be found in TimeWindowTest[1]. > > Best, Hequn > > [1] > https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java > > > On Wed, Jul 10, 2019 at 8:46 PM Flavio Pompermaier > wrote: > >> The problem with the LATERAL JOIN (via >> a LookupableTableSource+TableFunction because I need to call that function >> using the userId a a parameter) is that I cannot know the window >> start/end..to me it's not clear how to get that from >> TimeWindow.getWindowStartWithOffset(timestamp, offset, windowSize)... >> Can you provide a pseudo-code example of how to implement this? >> >> On Tue, Jul 9, 2019 at 4:15 AM Hequn Cheng wrote: >> >>> Hi Flavio, >>> >>> Thanks for your information. >>> >>> From your description, it seems that you only use the window to get the >>> start and end time. There are no aggregations happen. If this is the case, >>> you can get the start and end time by yourself(the >>> `TimeWindow.getWindowStartWithOffset()` shows how to get window start >>> according to the timestamp). To be more specific, if you use processing >>> time, you can get your timestamp with System.currentTimeMillis(), and then >>> use it to get the window start and end >>> with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get >>> the timestamp from the rowtime field. >>> >>> With the start and end time, you can then perform LATERAL JOIN to enrich >>> the information. You can add a cache in your table function to avoid >>> frequent contacting with the REST endpoint. >>> >>> Best, Hequn >>> >>> >>> On Mon, Jul 8, 2019 at 10:46 PM Flavio Pompermaier >>> wrote: >>> >>>> Hi Hequn, thanks for your answer. >>>> What I'm trying to do is to read a stream of events that basically >>>> contains a UserId field and, every X minutes (i.e. using a Time Window) and >>>> for each different UserId key, query 3 different REST services to enrich my >>>> POJOs*. >>>> For the moment what I do is to use a ProcessWindowFunction after the >>>> .keyBy().window() as shown in the previous mail example to contact those 3 >>>> services and enrich my object. >>>> >>>> However I don't like this solution because I'd like to use Flink to >>>> it's full potential so I'd like to enrich my object using LATERAL TABLEs or >>>> ASYNC IO.. >>>> The main problem I'm facing right now is that I can't find a way to >>>> pass the thumbing window start/end to the LATERAL JOIN table functions >>>> (because this is a parameter of the REST query). >>>> Moreover I don't know whether this use case is something that Table API >>>> aims to solve.. >>>> >>>> * Of course this could kill the REST endpoint if the number of users is >>>> very big ..because of this I'd like to keep the external state of source >>>> tables as an internal Flink state and then do a JOIN on the UserId. The >>>> problem here is that I need to "materialize" them using Debezium (or >>>> similar) via Kafka and dynamic tables..is there any example of keeping >>>> multiple tables synched with Flink state through Debezium (without the need >>>> of rewriting all the logic for managing UPDATE/INSERT/DELETE)? >>>> >>>> On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng >>>> wrote: >>>> >>>>> Hi Flavio, >>>>> >>>>> Nice to hear your ideas on Table API! >>>>> >>>>> Could you be more specific about your requirements? A detailed >>>>> scenario would be quite helpful. For example, do you want to emit multi >>>>> records through the collector or do you want to use
Re: Table API and ProcessWindowFunction
Hi, > Can you provide a pseudo-code example of how to implement this? Processing time If you use a TumblingProcessingTimeWindows.of(Time.seconds(1)), for each record, you get the timestamp from System.currentTimeMillis(), say t, and w_start = TimeWindow.getWindowStartWithOffset(t, 0, 1000), and w_end = w_start + 1000. Event time If you use a TumblingEventTimeWindows.of(Time.seconds(1)), for each record, get the timestamp from the corresponding timestamp field, say t, and get w_start and w_end same as above. More examples can be found in TimeWindowTest[1]. Best, Hequn [1] https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java On Wed, Jul 10, 2019 at 8:46 PM Flavio Pompermaier wrote: > The problem with the LATERAL JOIN (via > a LookupableTableSource+TableFunction because I need to call that function > using the userId a a parameter) is that I cannot know the window > start/end..to me it's not clear how to get that from > TimeWindow.getWindowStartWithOffset(timestamp, offset, windowSize)... > Can you provide a pseudo-code example of how to implement this? > > On Tue, Jul 9, 2019 at 4:15 AM Hequn Cheng wrote: > >> Hi Flavio, >> >> Thanks for your information. >> >> From your description, it seems that you only use the window to get the >> start and end time. There are no aggregations happen. If this is the case, >> you can get the start and end time by yourself(the >> `TimeWindow.getWindowStartWithOffset()` shows how to get window start >> according to the timestamp). To be more specific, if you use processing >> time, you can get your timestamp with System.currentTimeMillis(), and then >> use it to get the window start and end >> with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get >> the timestamp from the rowtime field. >> >> With the start and end time, you can then perform LATERAL JOIN to enrich >> the information. You can add a cache in your table function to avoid >> frequent contacting with the REST endpoint. >> >> Best, Hequn >> >> >> On Mon, Jul 8, 2019 at 10:46 PM Flavio Pompermaier >> wrote: >> >>> Hi Hequn, thanks for your answer. >>> What I'm trying to do is to read a stream of events that basically >>> contains a UserId field and, every X minutes (i.e. using a Time Window) and >>> for each different UserId key, query 3 different REST services to enrich my >>> POJOs*. >>> For the moment what I do is to use a ProcessWindowFunction after the >>> .keyBy().window() as shown in the previous mail example to contact those 3 >>> services and enrich my object. >>> >>> However I don't like this solution because I'd like to use Flink to it's >>> full potential so I'd like to enrich my object using LATERAL TABLEs or >>> ASYNC IO.. >>> The main problem I'm facing right now is that I can't find a way to >>> pass the thumbing window start/end to the LATERAL JOIN table functions >>> (because this is a parameter of the REST query). >>> Moreover I don't know whether this use case is something that Table API >>> aims to solve.. >>> >>> * Of course this could kill the REST endpoint if the number of users is >>> very big ..because of this I'd like to keep the external state of source >>> tables as an internal Flink state and then do a JOIN on the UserId. The >>> problem here is that I need to "materialize" them using Debezium (or >>> similar) via Kafka and dynamic tables..is there any example of keeping >>> multiple tables synched with Flink state through Debezium (without the need >>> of rewriting all the logic for managing UPDATE/INSERT/DELETE)? >>> >>> On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng wrote: >>> >>>> Hi Flavio, >>>> >>>> Nice to hear your ideas on Table API! >>>> >>>> Could you be more specific about your requirements? A detailed scenario >>>> would be quite helpful. For example, do you want to emit multi records >>>> through the collector or do you want to use the timer? >>>> >>>> BTW, Table API introduces flatAggregate recently(both non-window >>>> flatAggregate and window flatAggregate) and will be included in the near >>>> coming release-1.9. The flatAggregate can emit multi records for a single >>>> group. More details here[1][2]. >>>> Hope this can solve your problem. >>>> >>>> Best, Hequn >>>> >>>> [1] >>>> ht
Re: Table API and ProcessWindowFunction
The problem with the LATERAL JOIN (via a LookupableTableSource+TableFunction because I need to call that function using the userId a a parameter) is that I cannot know the window start/end..to me it's not clear how to get that from TimeWindow.getWindowStartWithOffset(timestamp, offset, windowSize)... Can you provide a pseudo-code example of how to implement this? On Tue, Jul 9, 2019 at 4:15 AM Hequn Cheng wrote: > Hi Flavio, > > Thanks for your information. > > From your description, it seems that you only use the window to get the > start and end time. There are no aggregations happen. If this is the case, > you can get the start and end time by yourself(the > `TimeWindow.getWindowStartWithOffset()` shows how to get window start > according to the timestamp). To be more specific, if you use processing > time, you can get your timestamp with System.currentTimeMillis(), and then > use it to get the window start and end > with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get > the timestamp from the rowtime field. > > With the start and end time, you can then perform LATERAL JOIN to enrich > the information. You can add a cache in your table function to avoid > frequent contacting with the REST endpoint. > > Best, Hequn > > > On Mon, Jul 8, 2019 at 10:46 PM Flavio Pompermaier > wrote: > >> Hi Hequn, thanks for your answer. >> What I'm trying to do is to read a stream of events that basically >> contains a UserId field and, every X minutes (i.e. using a Time Window) and >> for each different UserId key, query 3 different REST services to enrich my >> POJOs*. >> For the moment what I do is to use a ProcessWindowFunction after the >> .keyBy().window() as shown in the previous mail example to contact those 3 >> services and enrich my object. >> >> However I don't like this solution because I'd like to use Flink to it's >> full potential so I'd like to enrich my object using LATERAL TABLEs or >> ASYNC IO.. >> The main problem I'm facing right now is that I can't find a way to pass >> the thumbing window start/end to the LATERAL JOIN table functions (because >> this is a parameter of the REST query). >> Moreover I don't know whether this use case is something that Table API >> aims to solve.. >> >> * Of course this could kill the REST endpoint if the number of users is >> very big ..because of this I'd like to keep the external state of source >> tables as an internal Flink state and then do a JOIN on the UserId. The >> problem here is that I need to "materialize" them using Debezium (or >> similar) via Kafka and dynamic tables..is there any example of keeping >> multiple tables synched with Flink state through Debezium (without the need >> of rewriting all the logic for managing UPDATE/INSERT/DELETE)? >> >> On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng wrote: >> >>> Hi Flavio, >>> >>> Nice to hear your ideas on Table API! >>> >>> Could you be more specific about your requirements? A detailed scenario >>> would be quite helpful. For example, do you want to emit multi records >>> through the collector or do you want to use the timer? >>> >>> BTW, Table API introduces flatAggregate recently(both non-window >>> flatAggregate and window flatAggregate) and will be included in the near >>> coming release-1.9. The flatAggregate can emit multi records for a single >>> group. More details here[1][2]. >>> Hope this can solve your problem. >>> >>> Best, Hequn >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#row-based-operations >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-aggregation-functions >>> >>> On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier >>> wrote: >>> >>>> Hi to all, >>>> from what I understood a ProcessWindowFunction can only be used in the >>>> Streaming API. >>>> Is there any plan to port them also in the Table API (in the near >>>> future)? >>>> I'd like to do with Table API the equivalent of: >>>> >>>> final DataStream events = env.addSource(src); >>>> events.filter(e -> e.getCode() != null) >>>> .keyBy(event -> Integer.valueOf(event.getCode())) >>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) >>>> .process(new ProcessWindowFunction>>> Integer, TimeWindow>() {.}); >>>> >>>> Best, >>>> Flavio >>>> >>> >> >> -- Flavio Pompermaier Development Department OKKAM S.r.l. Tel. +(39) 0461 041809
Re: Table API and ProcessWindowFunction
Hi Flavio, Thanks for your information. >From your description, it seems that you only use the window to get the start and end time. There are no aggregations happen. If this is the case, you can get the start and end time by yourself(the `TimeWindow.getWindowStartWithOffset()` shows how to get window start according to the timestamp). To be more specific, if you use processing time, you can get your timestamp with System.currentTimeMillis(), and then use it to get the window start and end with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get the timestamp from the rowtime field. With the start and end time, you can then perform LATERAL JOIN to enrich the information. You can add a cache in your table function to avoid frequent contacting with the REST endpoint. Best, Hequn On Mon, Jul 8, 2019 at 10:46 PM Flavio Pompermaier wrote: > Hi Hequn, thanks for your answer. > What I'm trying to do is to read a stream of events that basically > contains a UserId field and, every X minutes (i.e. using a Time Window) and > for each different UserId key, query 3 different REST services to enrich my > POJOs*. > For the moment what I do is to use a ProcessWindowFunction after the > .keyBy().window() as shown in the previous mail example to contact those 3 > services and enrich my object. > > However I don't like this solution because I'd like to use Flink to it's > full potential so I'd like to enrich my object using LATERAL TABLEs or > ASYNC IO.. > The main problem I'm facing right now is that I can't find a way to pass > the thumbing window start/end to the LATERAL JOIN table functions (because > this is a parameter of the REST query). > Moreover I don't know whether this use case is something that Table API > aims to solve.. > > * Of course this could kill the REST endpoint if the number of users is > very big ..because of this I'd like to keep the external state of source > tables as an internal Flink state and then do a JOIN on the UserId. The > problem here is that I need to "materialize" them using Debezium (or > similar) via Kafka and dynamic tables..is there any example of keeping > multiple tables synched with Flink state through Debezium (without the need > of rewriting all the logic for managing UPDATE/INSERT/DELETE)? > > On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng wrote: > >> Hi Flavio, >> >> Nice to hear your ideas on Table API! >> >> Could you be more specific about your requirements? A detailed scenario >> would be quite helpful. For example, do you want to emit multi records >> through the collector or do you want to use the timer? >> >> BTW, Table API introduces flatAggregate recently(both non-window >> flatAggregate and window flatAggregate) and will be included in the near >> coming release-1.9. The flatAggregate can emit multi records for a single >> group. More details here[1][2]. >> Hope this can solve your problem. >> >> Best, Hequn >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#row-based-operations >> [2] >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-aggregation-functions >> >> On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier >> wrote: >> >>> Hi to all, >>> from what I understood a ProcessWindowFunction can only be used in the >>> Streaming API. >>> Is there any plan to port them also in the Table API (in the near >>> future)? >>> I'd like to do with Table API the equivalent of: >>> >>> final DataStream events = env.addSource(src); >>> events.filter(e -> e.getCode() != null) >>> .keyBy(event -> Integer.valueOf(event.getCode())) >>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) >>> .process(new ProcessWindowFunction>> Integer, TimeWindow>() {.}); >>> >>> Best, >>> Flavio >>> >> > >
Re: Table API and ProcessWindowFunction
Hi Hequn, thanks for your answer. What I'm trying to do is to read a stream of events that basically contains a UserId field and, every X minutes (i.e. using a Time Window) and for each different UserId key, query 3 different REST services to enrich my POJOs*. For the moment what I do is to use a ProcessWindowFunction after the .keyBy().window() as shown in the previous mail example to contact those 3 services and enrich my object. However I don't like this solution because I'd like to use Flink to it's full potential so I'd like to enrich my object using LATERAL TABLEs or ASYNC IO.. The main problem I'm facing right now is that I can't find a way to pass the thumbing window start/end to the LATERAL JOIN table functions (because this is a parameter of the REST query). Moreover I don't know whether this use case is something that Table API aims to solve.. * Of course this could kill the REST endpoint if the number of users is very big ..because of this I'd like to keep the external state of source tables as an internal Flink state and then do a JOIN on the UserId. The problem here is that I need to "materialize" them using Debezium (or similar) via Kafka and dynamic tables..is there any example of keeping multiple tables synched with Flink state through Debezium (without the need of rewriting all the logic for managing UPDATE/INSERT/DELETE)? On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng wrote: > Hi Flavio, > > Nice to hear your ideas on Table API! > > Could you be more specific about your requirements? A detailed scenario > would be quite helpful. For example, do you want to emit multi records > through the collector or do you want to use the timer? > > BTW, Table API introduces flatAggregate recently(both non-window > flatAggregate and window flatAggregate) and will be included in the near > coming release-1.9. The flatAggregate can emit multi records for a single > group. More details here[1][2]. > Hope this can solve your problem. > > Best, Hequn > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#row-based-operations > [2] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-aggregation-functions > > On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier > wrote: > >> Hi to all, >> from what I understood a ProcessWindowFunction can only be used in the >> Streaming API. >> Is there any plan to port them also in the Table API (in the near future)? >> I'd like to do with Table API the equivalent of: >> >> final DataStream events = env.addSource(src); >> events.filter(e -> e.getCode() != null) >> .keyBy(event -> Integer.valueOf(event.getCode())) >> .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) >> .process(new ProcessWindowFunction> TimeWindow>() {.}); >> >> Best, >> Flavio >> >
Re: Table API and ProcessWindowFunction
Hi Flavio, Nice to hear your ideas on Table API! Could you be more specific about your requirements? A detailed scenario would be quite helpful. For example, do you want to emit multi records through the collector or do you want to use the timer? BTW, Table API introduces flatAggregate recently(both non-window flatAggregate and window flatAggregate) and will be included in the near coming release-1.9. The flatAggregate can emit multi records for a single group. More details here[1][2]. Hope this can solve your problem. Best, Hequn [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#row-based-operations [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-aggregation-functions On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier wrote: > Hi to all, > from what I understood a ProcessWindowFunction can only be used in the > Streaming API. > Is there any plan to port them also in the Table API (in the near future)? > I'd like to do with Table API the equivalent of: > > final DataStream events = env.addSource(src); > events.filter(e -> e.getCode() != null) > .keyBy(event -> Integer.valueOf(event.getCode())) > .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) > .process(new ProcessWindowFunction TimeWindow>() {.}); > > Best, > Flavio >
Table API and ProcessWindowFunction
Hi to all, from what I understood a ProcessWindowFunction can only be used in the Streaming API. Is there any plan to port them also in the Table API (in the near future)? I'd like to do with Table API the equivalent of: final DataStream events = env.addSource(src); events.filter(e -> e.getCode() != null) .keyBy(event -> Integer.valueOf(event.getCode())) .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) .process(new ProcessWindowFunction() {.}); Best, Flavio
Re: processWindowFunction
Maybe the usage of that function change, now I have to use it as this [1] [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction El lun., 20 ago. 2018 a las 5:56, vino yang () escribió: > Hi antonio, > > Oh, if you can't use KeyedProcessFunction, then this would be a pity. > Then you can use MapState, where Key is used to store the key of your > partition. > But I am not sure if this will achieve the effect you want. > > Thanks, vino. > > antonio saldivar 于2018年8月20日周一 下午4:32写道: > >> Hello >> >> Thank you for the information, for some reason this KeyedProcessFunction >> is not found in my Flink version 1.4.2 I can only find ProcessFunction and >> work like this >> >> public class TxnProcessFn extends ProcessFunction { >> >> public void open(Configuration parameters) throws Exception { >> >> state1 = getRuntimeContext().getState(new ValueStateDescriptor<>( >> "objState1", Object.class)); >> >> state2 = getRuntimeContext().getState(new ValueStateDescriptor<>( >> "objState2", Object.class)); >> >> state3 = getRuntimeContext().getState(new ValueStateDescriptor<>( >> "objState3", Object.class)); >> >> } >> >> @Override >> >> public void processElement( >> >> Object obj, >> >> Context ctx, >> >> Collector out) throws Exception { >> >> // TODO Auto-generated method stub >> >> Object current = state.value(); >> >> if (current == null) { >> >> current = new Object(); >> >> current.id=obj.id(); >> >> >> >> } >> >> } >> >> El lun., 20 ago. 2018 a las 2:24, vino yang () >> escribió: >> >>> Hi antonio, >>> >>> First, I suggest you use KeyedProcessFunction if you have an operation >>> similar to keyBy. >>> The implementation is similar to the Fixed window. >>> You can create three state collections to determine whether the time of >>> each element belongs to a state collection. >>> At the time of the trigger, the elements in the collection are evaluated. >>> >>> Thanks, vino. >>> >>> antonio saldivar 于2018年8月20日周一 上午11:54写道: >>> >>>> Thank you fro the references >>>> >>>> I have now my processFunction and getting the state but now how can i >>>> do for the threshold times to group the elements and also as this is a >>>> global window, how to purge because if going to keep increasing >>>> >>>> El dom., 19 ago. 2018 a las 8:57, vino yang () >>>> escribió: >>>> >>>>> Hi antonio, >>>>> >>>>> Regarding your scenario, I think maybe you can consider using the >>>>> ProcessFunction (or keyed ProcessFunction) function directly on the >>>>> Stream. >>>>> [1] >>>>> It can handle each of your elements with a Timer, and you can combine >>>>> Flink's state API[2] to store your data. >>>>> >>>>> [1]: >>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#process-function-low-level-operations >>>>> [2]: >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#working-with-state >>>>> >>>>> Thanks, vino. >>>>> >>>>> antonio saldivar 于2018年8月19日周日 上午10:18写道: >>>>> >>>>>> hi Vino >>>>>> >>>>>> it is possible to use global window, then set the trigger onElement >>>>>> comparing the element that has arrived with for example 10 mins, 20 mins >>>>>> and 60 mins of data? >>>>>> >>>>>> I have rules evaluating sum of amount for 10,20 or 60 mins for the >>>>>> same keyed element if the same id sum like $200 total within those >>>>>> thresholds and count more or equals to 3 I need to be able to set some >>>>>> values to the object if the object does not reach those thresholds i do >>>>>> not >>>>>> set the values and keep sending the output with or without those value. >>>>>> >>>>>> just processing the object on the fly and send output >>>>>> >>>>>> >>>>>> >>>>>> &g
Re: processWindowFunction
Hi antonio, Oh, if you can't use KeyedProcessFunction, then this would be a pity. Then you can use MapState, where Key is used to store the key of your partition. But I am not sure if this will achieve the effect you want. Thanks, vino. antonio saldivar 于2018年8月20日周一 下午4:32写道: > Hello > > Thank you for the information, for some reason this KeyedProcessFunction > is not found in my Flink version 1.4.2 I can only find ProcessFunction and > work like this > > public class TxnProcessFn extends ProcessFunction { > > public void open(Configuration parameters) throws Exception { > > state1 = getRuntimeContext().getState(new ValueStateDescriptor<>( > "objState1", Object.class)); > > state2 = getRuntimeContext().getState(new ValueStateDescriptor<>( > "objState2", Object.class)); > > state3 = getRuntimeContext().getState(new ValueStateDescriptor<>( > "objState3", Object.class)); > > } > > @Override > > public void processElement( > > Object obj, > > Context ctx, > > Collector out) throws Exception { > > // TODO Auto-generated method stub > > Object current = state.value(); > > if (current == null) { > > current = new Object(); > > current.id=obj.id(); > > > > } > > } > > El lun., 20 ago. 2018 a las 2:24, vino yang () > escribió: > >> Hi antonio, >> >> First, I suggest you use KeyedProcessFunction if you have an operation >> similar to keyBy. >> The implementation is similar to the Fixed window. >> You can create three state collections to determine whether the time of >> each element belongs to a state collection. >> At the time of the trigger, the elements in the collection are evaluated. >> >> Thanks, vino. >> >> antonio saldivar 于2018年8月20日周一 上午11:54写道: >> >>> Thank you fro the references >>> >>> I have now my processFunction and getting the state but now how can i do >>> for the threshold times to group the elements and also as this is a global >>> window, how to purge because if going to keep increasing >>> >>> El dom., 19 ago. 2018 a las 8:57, vino yang () >>> escribió: >>> >>>> Hi antonio, >>>> >>>> Regarding your scenario, I think maybe you can consider using the >>>> ProcessFunction (or keyed ProcessFunction) function directly on the Stream. >>>> [1] >>>> It can handle each of your elements with a Timer, and you can combine >>>> Flink's state API[2] to store your data. >>>> >>>> [1]: >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#process-function-low-level-operations >>>> [2]: >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#working-with-state >>>> >>>> Thanks, vino. >>>> >>>> antonio saldivar 于2018年8月19日周日 上午10:18写道: >>>> >>>>> hi Vino >>>>> >>>>> it is possible to use global window, then set the trigger onElement >>>>> comparing the element that has arrived with for example 10 mins, 20 mins >>>>> and 60 mins of data? >>>>> >>>>> I have rules evaluating sum of amount for 10,20 or 60 mins for the >>>>> same keyed element if the same id sum like $200 total within those >>>>> thresholds and count more or equals to 3 I need to be able to set some >>>>> values to the object if the object does not reach those thresholds i do >>>>> not >>>>> set the values and keep sending the output with or without those value. >>>>> >>>>> just processing the object on the fly and send output >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> El vie., 17 ago. 2018 a las 22:14, vino yang () >>>>> escribió: >>>>> >>>>>> Hi antonio, >>>>>> >>>>>> Yes, ProcessWindowFunction is a very low level window function. >>>>>> It allows you to access the data in the window and allows you to >>>>>> customize the output of the window. >>>>>> So if you use it, while giving you flexibility, you need to think >>>>>> about other things, which may require you to write more processing logic. >>>>>> >>>>>> Generally speaking, sliding windows usu
Re: processWindowFunction
Hello Thank you for the information, for some reason this KeyedProcessFunction is not found in my Flink version 1.4.2 I can only find ProcessFunction and work like this public class TxnProcessFn extends ProcessFunction { public void open(Configuration parameters) throws Exception { state1 = getRuntimeContext().getState(new ValueStateDescriptor<>("objState1", Object.class)); state2 = getRuntimeContext().getState(new ValueStateDescriptor<>("objState2", Object.class)); state3 = getRuntimeContext().getState(new ValueStateDescriptor<>("objState3", Object.class)); } @Override public void processElement( Object obj, Context ctx, Collector out) throws Exception { // TODO Auto-generated method stub Object current = state.value(); if (current == null) { current = new Object(); current.id=obj.id(); } } El lun., 20 ago. 2018 a las 2:24, vino yang () escribió: > Hi antonio, > > First, I suggest you use KeyedProcessFunction if you have an operation > similar to keyBy. > The implementation is similar to the Fixed window. > You can create three state collections to determine whether the time of > each element belongs to a state collection. > At the time of the trigger, the elements in the collection are evaluated. > > Thanks, vino. > > antonio saldivar 于2018年8月20日周一 上午11:54写道: > >> Thank you fro the references >> >> I have now my processFunction and getting the state but now how can i do >> for the threshold times to group the elements and also as this is a global >> window, how to purge because if going to keep increasing >> >> El dom., 19 ago. 2018 a las 8:57, vino yang () >> escribió: >> >>> Hi antonio, >>> >>> Regarding your scenario, I think maybe you can consider using the >>> ProcessFunction (or keyed ProcessFunction) function directly on the Stream. >>> [1] >>> It can handle each of your elements with a Timer, and you can combine >>> Flink's state API[2] to store your data. >>> >>> [1]: >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#process-function-low-level-operations >>> [2]: >>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#working-with-state >>> >>> Thanks, vino. >>> >>> antonio saldivar 于2018年8月19日周日 上午10:18写道: >>> >>>> hi Vino >>>> >>>> it is possible to use global window, then set the trigger onElement >>>> comparing the element that has arrived with for example 10 mins, 20 mins >>>> and 60 mins of data? >>>> >>>> I have rules evaluating sum of amount for 10,20 or 60 mins for the same >>>> keyed element if the same id sum like $200 total within those thresholds >>>> and count more or equals to 3 I need to be able to set some values to the >>>> object if the object does not reach those thresholds i do not set the >>>> values and keep sending the output with or without those value. >>>> >>>> just processing the object on the fly and send output >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> El vie., 17 ago. 2018 a las 22:14, vino yang () >>>> escribió: >>>> >>>>> Hi antonio, >>>>> >>>>> Yes, ProcessWindowFunction is a very low level window function. >>>>> It allows you to access the data in the window and allows you to >>>>> customize the output of the window. >>>>> So if you use it, while giving you flexibility, you need to think >>>>> about other things, which may require you to write more processing logic. >>>>> >>>>> Generally speaking, sliding windows usually have some data that is >>>>> repeated, but a common mode is to apply a reduce function on it to get >>>>> your >>>>> calculation results. >>>>> If you only send data, there will definitely be some duplication. >>>>> >>>>> Thanks, vino. >>>>> >>>>> antonio saldivar 于2018年8月17日周五 下午12:01写道: >>>>> >>>>>> Hi Vino >>>>>> thank you for the information, actually I am using a trigger alert >>>>>> and processWindowFunction to send my results, but when my window slides >>>>>> or >>>>>> ends it sends again the objects and I an getting duplicated data >>>>>> >>>>>
Re: processWindowFunction
Hi antonio, First, I suggest you use KeyedProcessFunction if you have an operation similar to keyBy. The implementation is similar to the Fixed window. You can create three state collections to determine whether the time of each element belongs to a state collection. At the time of the trigger, the elements in the collection are evaluated. Thanks, vino. antonio saldivar 于2018年8月20日周一 上午11:54写道: > Thank you fro the references > > I have now my processFunction and getting the state but now how can i do > for the threshold times to group the elements and also as this is a global > window, how to purge because if going to keep increasing > > El dom., 19 ago. 2018 a las 8:57, vino yang () > escribió: > >> Hi antonio, >> >> Regarding your scenario, I think maybe you can consider using the >> ProcessFunction (or keyed ProcessFunction) function directly on the Stream. >> [1] >> It can handle each of your elements with a Timer, and you can combine >> Flink's state API[2] to store your data. >> >> [1]: >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#process-function-low-level-operations >> [2]: >> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#working-with-state >> >> Thanks, vino. >> >> antonio saldivar 于2018年8月19日周日 上午10:18写道: >> >>> hi Vino >>> >>> it is possible to use global window, then set the trigger onElement >>> comparing the element that has arrived with for example 10 mins, 20 mins >>> and 60 mins of data? >>> >>> I have rules evaluating sum of amount for 10,20 or 60 mins for the same >>> keyed element if the same id sum like $200 total within those thresholds >>> and count more or equals to 3 I need to be able to set some values to the >>> object if the object does not reach those thresholds i do not set the >>> values and keep sending the output with or without those value. >>> >>> just processing the object on the fly and send output >>> >>> >>> >>> >>> >>> >>> >>> El vie., 17 ago. 2018 a las 22:14, vino yang () >>> escribió: >>> >>>> Hi antonio, >>>> >>>> Yes, ProcessWindowFunction is a very low level window function. >>>> It allows you to access the data in the window and allows you to >>>> customize the output of the window. >>>> So if you use it, while giving you flexibility, you need to think about >>>> other things, which may require you to write more processing logic. >>>> >>>> Generally speaking, sliding windows usually have some data that is >>>> repeated, but a common mode is to apply a reduce function on it to get your >>>> calculation results. >>>> If you only send data, there will definitely be some duplication. >>>> >>>> Thanks, vino. >>>> >>>> antonio saldivar 于2018年8月17日周五 下午12:01写道: >>>> >>>>> Hi Vino >>>>> thank you for the information, actually I am using a trigger alert and >>>>> processWindowFunction to send my results, but when my window slides or >>>>> ends >>>>> it sends again the objects and I an getting duplicated data >>>>> >>>>> El jue., 16 ago. 2018 a las 22:05, vino yang () >>>>> escribió: >>>>> >>>>>> Hi Antonio, >>>>>> >>>>>> What results do not you want to get when creating each window? >>>>>> Examples of the use of ProcessWindowFunction are included in many >>>>>> test files in Flink's project, such as SideOutputITCase.scala or >>>>>> WindowTranslationTest.scala. >>>>>> >>>>>> For more information on ProcessWindowFunction, you can refer to the >>>>>> official website.[1] >>>>>> >>>>>> [1]: >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#processwindowfunction >>>>>> >>>>>> Thanks, vino. >>>>>> >>>>>> antonio saldivar 于2018年8月17日周五 上午6:24写道: >>>>>> >>>>>>> Hello >>>>>>> >>>>>>> I am implementing a data stream where I use sliding windows but I am >>>>>>> stuck because I need to set values to my object based on some if >>>>>>> statements >>>>>>> in my process function and send the object to the next step but I don't >>>>>>> want results every time a window is creating >>>>>>> >>>>>>> if anyone has a good example on this that can help me >>>>>>> >>>>>>
Re: processWindowFunction
Thank you fro the references I have now my processFunction and getting the state but now how can i do for the threshold times to group the elements and also as this is a global window, how to purge because if going to keep increasing El dom., 19 ago. 2018 a las 8:57, vino yang () escribió: > Hi antonio, > > Regarding your scenario, I think maybe you can consider using the > ProcessFunction (or keyed ProcessFunction) function directly on the Stream. > [1] > It can handle each of your elements with a Timer, and you can combine > Flink's state API[2] to store your data. > > [1]: > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#process-function-low-level-operations > [2]: > https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#working-with-state > > Thanks, vino. > > antonio saldivar 于2018年8月19日周日 上午10:18写道: > >> hi Vino >> >> it is possible to use global window, then set the trigger onElement >> comparing the element that has arrived with for example 10 mins, 20 mins >> and 60 mins of data? >> >> I have rules evaluating sum of amount for 10,20 or 60 mins for the same >> keyed element if the same id sum like $200 total within those thresholds >> and count more or equals to 3 I need to be able to set some values to the >> object if the object does not reach those thresholds i do not set the >> values and keep sending the output with or without those value. >> >> just processing the object on the fly and send output >> >> >> >> >> >> >> >> El vie., 17 ago. 2018 a las 22:14, vino yang () >> escribió: >> >>> Hi antonio, >>> >>> Yes, ProcessWindowFunction is a very low level window function. >>> It allows you to access the data in the window and allows you to >>> customize the output of the window. >>> So if you use it, while giving you flexibility, you need to think about >>> other things, which may require you to write more processing logic. >>> >>> Generally speaking, sliding windows usually have some data that is >>> repeated, but a common mode is to apply a reduce function on it to get your >>> calculation results. >>> If you only send data, there will definitely be some duplication. >>> >>> Thanks, vino. >>> >>> antonio saldivar 于2018年8月17日周五 下午12:01写道: >>> >>>> Hi Vino >>>> thank you for the information, actually I am using a trigger alert and >>>> processWindowFunction to send my results, but when my window slides or ends >>>> it sends again the objects and I an getting duplicated data >>>> >>>> El jue., 16 ago. 2018 a las 22:05, vino yang () >>>> escribió: >>>> >>>>> Hi Antonio, >>>>> >>>>> What results do not you want to get when creating each window? >>>>> Examples of the use of ProcessWindowFunction are included in many test >>>>> files in Flink's project, such as SideOutputITCase.scala or >>>>> WindowTranslationTest.scala. >>>>> >>>>> For more information on ProcessWindowFunction, you can refer to the >>>>> official website.[1] >>>>> >>>>> [1]: >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#processwindowfunction >>>>> >>>>> Thanks, vino. >>>>> >>>>> antonio saldivar 于2018年8月17日周五 上午6:24写道: >>>>> >>>>>> Hello >>>>>> >>>>>> I am implementing a data stream where I use sliding windows but I am >>>>>> stuck because I need to set values to my object based on some if >>>>>> statements >>>>>> in my process function and send the object to the next step but I don't >>>>>> want results every time a window is creating >>>>>> >>>>>> if anyone has a good example on this that can help me >>>>>> >>>>>
Re: processWindowFunction
Hi antonio, Regarding your scenario, I think maybe you can consider using the ProcessFunction (or keyed ProcessFunction) function directly on the Stream. [1] It can handle each of your elements with a Timer, and you can combine Flink's state API[2] to store your data. [1]: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#process-function-low-level-operations [2]: https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#working-with-state Thanks, vino. antonio saldivar 于2018年8月19日周日 上午10:18写道: > hi Vino > > it is possible to use global window, then set the trigger onElement > comparing the element that has arrived with for example 10 mins, 20 mins > and 60 mins of data? > > I have rules evaluating sum of amount for 10,20 or 60 mins for the same > keyed element if the same id sum like $200 total within those thresholds > and count more or equals to 3 I need to be able to set some values to the > object if the object does not reach those thresholds i do not set the > values and keep sending the output with or without those value. > > just processing the object on the fly and send output > > > > > > > > El vie., 17 ago. 2018 a las 22:14, vino yang () > escribió: > >> Hi antonio, >> >> Yes, ProcessWindowFunction is a very low level window function. >> It allows you to access the data in the window and allows you to >> customize the output of the window. >> So if you use it, while giving you flexibility, you need to think about >> other things, which may require you to write more processing logic. >> >> Generally speaking, sliding windows usually have some data that is >> repeated, but a common mode is to apply a reduce function on it to get your >> calculation results. >> If you only send data, there will definitely be some duplication. >> >> Thanks, vino. >> >> antonio saldivar 于2018年8月17日周五 下午12:01写道: >> >>> Hi Vino >>> thank you for the information, actually I am using a trigger alert and >>> processWindowFunction to send my results, but when my window slides or ends >>> it sends again the objects and I an getting duplicated data >>> >>> El jue., 16 ago. 2018 a las 22:05, vino yang () >>> escribió: >>> >>>> Hi Antonio, >>>> >>>> What results do not you want to get when creating each window? >>>> Examples of the use of ProcessWindowFunction are included in many test >>>> files in Flink's project, such as SideOutputITCase.scala or >>>> WindowTranslationTest.scala. >>>> >>>> For more information on ProcessWindowFunction, you can refer to the >>>> official website.[1] >>>> >>>> [1]: >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#processwindowfunction >>>> >>>> Thanks, vino. >>>> >>>> antonio saldivar 于2018年8月17日周五 上午6:24写道: >>>> >>>>> Hello >>>>> >>>>> I am implementing a data stream where I use sliding windows but I am >>>>> stuck because I need to set values to my object based on some if >>>>> statements >>>>> in my process function and send the object to the next step but I don't >>>>> want results every time a window is creating >>>>> >>>>> if anyone has a good example on this that can help me >>>>> >>>>
Re: processWindowFunction
hi Vino it is possible to use global window, then set the trigger onElement comparing the element that has arrived with for example 10 mins, 20 mins and 60 mins of data? I have rules evaluating sum of amount for 10,20 or 60 mins for the same keyed element if the same id sum like $200 total within those thresholds and count more or equals to 3 I need to be able to set some values to the object if the object does not reach those thresholds i do not set the values and keep sending the output with or without those value. just processing the object on the fly and send output El vie., 17 ago. 2018 a las 22:14, vino yang () escribió: > Hi antonio, > > Yes, ProcessWindowFunction is a very low level window function. > It allows you to access the data in the window and allows you to customize > the output of the window. > So if you use it, while giving you flexibility, you need to think about > other things, which may require you to write more processing logic. > > Generally speaking, sliding windows usually have some data that is > repeated, but a common mode is to apply a reduce function on it to get your > calculation results. > If you only send data, there will definitely be some duplication. > > Thanks, vino. > > antonio saldivar 于2018年8月17日周五 下午12:01写道: > >> Hi Vino >> thank you for the information, actually I am using a trigger alert and >> processWindowFunction to send my results, but when my window slides or ends >> it sends again the objects and I an getting duplicated data >> >> El jue., 16 ago. 2018 a las 22:05, vino yang () >> escribió: >> >>> Hi Antonio, >>> >>> What results do not you want to get when creating each window? >>> Examples of the use of ProcessWindowFunction are included in many test >>> files in Flink's project, such as SideOutputITCase.scala or >>> WindowTranslationTest.scala. >>> >>> For more information on ProcessWindowFunction, you can refer to the >>> official website.[1] >>> >>> [1]: >>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#processwindowfunction >>> >>> Thanks, vino. >>> >>> antonio saldivar 于2018年8月17日周五 上午6:24写道: >>> >>>> Hello >>>> >>>> I am implementing a data stream where I use sliding windows but I am >>>> stuck because I need to set values to my object based on some if statements >>>> in my process function and send the object to the next step but I don't >>>> want results every time a window is creating >>>> >>>> if anyone has a good example on this that can help me >>>> >>>
Re: processWindowFunction
Hi antonio, Yes, ProcessWindowFunction is a very low level window function. It allows you to access the data in the window and allows you to customize the output of the window. So if you use it, while giving you flexibility, you need to think about other things, which may require you to write more processing logic. Generally speaking, sliding windows usually have some data that is repeated, but a common mode is to apply a reduce function on it to get your calculation results. If you only send data, there will definitely be some duplication. Thanks, vino. antonio saldivar 于2018年8月17日周五 下午12:01写道: > Hi Vino > thank you for the information, actually I am using a trigger alert and > processWindowFunction to send my results, but when my window slides or ends > it sends again the objects and I an getting duplicated data > > El jue., 16 ago. 2018 a las 22:05, vino yang () > escribió: > >> Hi Antonio, >> >> What results do not you want to get when creating each window? >> Examples of the use of ProcessWindowFunction are included in many test >> files in Flink's project, such as SideOutputITCase.scala or >> WindowTranslationTest.scala. >> >> For more information on ProcessWindowFunction, you can refer to the >> official website.[1] >> >> [1]: >> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#processwindowfunction >> >> Thanks, vino. >> >> antonio saldivar 于2018年8月17日周五 上午6:24写道: >> >>> Hello >>> >>> I am implementing a data stream where I use sliding windows but I am >>> stuck because I need to set values to my object based on some if statements >>> in my process function and send the object to the next step but I don't >>> want results every time a window is creating >>> >>> if anyone has a good example on this that can help me >>> >>
Re: processWindowFunction
Hi Vino thank you for the information, actually I am using a trigger alert and processWindowFunction to send my results, but when my window slides or ends it sends again the objects and I an getting duplicated data El jue., 16 ago. 2018 a las 22:05, vino yang () escribió: > Hi Antonio, > > What results do not you want to get when creating each window? > Examples of the use of ProcessWindowFunction are included in many test > files in Flink's project, such as SideOutputITCase.scala or > WindowTranslationTest.scala. > > For more information on ProcessWindowFunction, you can refer to the > official website.[1] > > [1]: > https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#processwindowfunction > > Thanks, vino. > > antonio saldivar 于2018年8月17日周五 上午6:24写道: > >> Hello >> >> I am implementing a data stream where I use sliding windows but I am >> stuck because I need to set values to my object based on some if statements >> in my process function and send the object to the next step but I don't >> want results every time a window is creating >> >> if anyone has a good example on this that can help me >> >
Re: processWindowFunction
Hi Antonio, What results do not you want to get when creating each window? Examples of the use of ProcessWindowFunction are included in many test files in Flink's project, such as SideOutputITCase.scala or WindowTranslationTest.scala. For more information on ProcessWindowFunction, you can refer to the official website.[1] [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#processwindowfunction Thanks, vino. antonio saldivar 于2018年8月17日周五 上午6:24写道: > Hello > > I am implementing a data stream where I use sliding windows but I am stuck > because I need to set values to my object based on some if statements in my > process function and send the object to the next step but I don't want > results every time a window is creating > > if anyone has a good example on this that can help me >
processWindowFunction
Hello I am implementing a data stream where I use sliding windows but I am stuck because I need to set values to my object based on some if statements in my process function and send the object to the next step but I don't want results every time a window is creating if anyone has a good example on this that can help me
ProcessWindowFunction
I read the doc about ProcessWindowFunction But I the code on the flink demo is incorrect public class MyProcessWindowFunction extends ProcessWindowFunction, String, String, TimeWindow> { Tuple cannot have to parameter. I try to find a demo which ProcessWindowFunction used in window word count demo, I can not even find a complete correct demo with ProcessWindowFunction. Can any one show me how to use ProcessWindowFunction in wordcount window function with .process(ProcessWindowFunction) ? -- *Yuanjun Miao*
Re: Access to time in aggregation, or aggregation in ProcessWindowFunction?
Hi William, I'm not quite sure what you are trying to achieve... What constitutes a "new event"? is this based on some key? If so, you may group on that key, create a window and use a custom trigger [1] instead where you can react in onElement() and setup a event time timer for the first one and then react in onEventTime for your timeout. A ProcessFunction [2] (without a window) looks like a better solution though depending on the details. Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/ windows.html#triggers [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/ process_function.html On Tuesday, 20 June 2017 12:52:38 CEST William Saar wrote: > Hi, > I am looking to implement a window that sends out updates for each new > event it receives and also when an expiration timer fires and purges > the window (the expiration time can be determined from a timestamp in > the first event). > > I can't figure out a way to do this that does not require preserving > all events in the window. It seems I would either need to be able to > check the current watermark when an aggregation or its window function > is evaluated to be able to fire the final update when the timer fires, > or I would need the WindowProcessFunction (where I do have access to > the time) to not preserve all elements in the window. > > The only way I've come up with to implement this is to use a > WindowProcessFunction that keeps state to only send out updates for > new elements in the elements iterable. The WindowProcessFunction then > also sends out an update when the first element timestamp meets the > expiration condition, or if the elements iterable parameter does not > contain any new elements (deducing that the processing must have been > triggered by a timer invocation and not a new element). Is there a > better way to do this? > > Thanks, > William signature.asc Description: This is a digitally signed message part.
Access to time in aggregation, or aggregation in ProcessWindowFunction?
Hi, I am looking to implement a window that sends out updates for each new event it receives and also when an expiration timer fires and purges the window (the expiration time can be determined from a timestamp in the first event). I can't figure out a way to do this that does not require preserving all events in the window. It seems I would either need to be able to check the current watermark when an aggregation or its window function is evaluated to be able to fire the final update when the timer fires, or I would need the WindowProcessFunction (where I do have access to the time) to not preserve all elements in the window. The only way I've come up with to implement this is to use a WindowProcessFunction that keeps state to only send out updates for new elements in the elements iterable. The WindowProcessFunction then also sends out an update when the first element timestamp meets the expiration condition, or if the elements iterable parameter does not contain any new elements (deducing that the processing must have been triggered by a timer invocation and not a new element). Is there a better way to do this? Thanks, William