Re: ProcessWindowFunction中使用per-window state

2024-04-12 Thread gongzhongqiang
你好,

可以通过使用  globalState / windowState 获取之前的状态进行增量计算。

下面这个 demo 可以方便理解:

public class ProcessWindowFunctionDemo {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 使用处理时间
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// 并行度为1
env.setParallelism(1);
// 设置数据源,一共三个元素
DataStream> dataStream = env.addSource(new
SourceFunction>() {
@Override
public void run(SourceContext> ctx)
throws Exception {
int xxxNum = 0;
int yyyNum = 0;
for (int i = 1; i < Integer.MAX_VALUE; i++) {
// 只有XXX和YYY两种name
String name = (0 == i % 2) ? "XXX" : "YYY";
//更新aaa和bbb元素的总数
if (0 == i % 2) {
xxxNum++;
} else {
yyyNum++;
}
// 使用当前时间作为时间戳
long timeStamp = System.currentTimeMillis();
// 将数据和时间戳打印出来,用来验证数据
System.out.println(String.format("source,%s, %s,XXX
total : %d,YYY total : %d\n",
name,
time(timeStamp),
xxxNum,
yyyNum));
// 发射一个元素,并且戴上了时间戳
ctx.collectWithTimestamp(new Tuple2(name, 1), timeStamp);
// 每发射一次就延时1秒
Thread.sleep(1000);
}
}

@Override
public void cancel() {
}
});

    // 将数据用5秒的滚动窗口做划分,再用ProcessWindowFunction
SingleOutputStreamOperator mainDataStream = dataStream
// 以Tuple2的f0字段作为key,本例中实际上key只有aaa和bbb两种
.keyBy(value -> value.f0)
// 5秒一次的滚动窗口
.timeWindow(Time.seconds(5))
// 统计每个key当前窗口内的元素数量,然后把key、数量、窗口起止时间整理成字符串发送给下游算子
    .process(new ProcessWindowFunction,
String, String, TimeWindow>() {
// 自定义状态
private ValueState state;
@Override
public void open(Configuration parameters) throws
Exception {
// 初始化状态,name是myState
state = getRuntimeContext().getState(new
ValueStateDescriptor<>("myState", KeyCount.class));
}

public void clear(Context context){
ValueState contextWindowValueState =
context.windowState().getState(new ValueStateDescriptor<>("myWindowState",
KeyCount.class));
contextWindowValueState.clear();
}

@Override
public void process(String s, Context context,
Iterable> iterable,
Collector collector) throws Exception {
// 从backend取得当前单词的myState状态
KeyCount current = state.value();
// 如果myState还从未没有赋值过,就在此初始化
if (current == null) {
current = new KeyCount();
current.key = s;
current.count = 0;
}
int count = 0;
// iterable可以访问该key当前窗口内的所有数据,
// 这里简单处理,只统计了元素数量
for (Tuple2 tuple2 : iterable) {
count++;
}
// 更新当前key的元素总数
current.count += count;
// 更新状态到backend
state.update(current);
System.out.println("getRuntimeContext() == context
:" + (getRuntimeContext() == context));
ValueState contextWindowValueState =
context.windowState().getState(new ValueStateDescriptor<>("myWindowState",
KeyCount.class));
ValueState contextGlobalValueState =
context.globalState().getState(new ValueStateDescriptor<>("myGlobalState",
KeyCount.class));
KeyCount windowValue =
contextWindowValueState.value();
if (windowValue == null) {
windowValue = new KeyCount();
windowValue.key = s;
windowValue.count = 0;
}
windowValue.count += count;
contextWindowValueState.update(windowValue);

KeyCount globalValue =
contextGlobalValueState.value();
if (globalValue == null) {
glo

回复: ProcessWindowFunction Parallelism

2023-09-26 Thread Chen Zhanghao
Hi Patricia,

You are using the .windowall API which generates a global window. This 
operation is inherently non-parallel since all elements have to pass through 
the same operator instance so it cannot be set to any parallelism larger than 1.

Best,
Zhanghao Chen

发件人: patricia lee 
发送时间: 2023年9月26日 21:30
收件人: user@flink.apache.org 
主题: ProcessWindowFunction Parallelism

Hi,

Are processwindowfunctions cannot have more than 1 parallelism? Whenever I set 
it to 2, I am receiving an error message, "The parallelism of non parallel 
operator must be 1."

dataKafka = Kafkasource (datasource)
.parallelism(2)
.rebalance();

dataKafka.windowAll(GlobalWindows.create()).trigger(MyCountTrigger.of(100))
.process( new CustomTrigger())
.setParallelism(2) ---> error throws here
.rebalance()






ProcessWindowFunction Parallelism

2023-09-26 Thread patricia lee
Hi,

Are processwindowfunctions cannot have more than 1 parallelism? Whenever I
set it to 2, I am receiving an error message, "The parallelism of non
parallel operator must be 1."

dataKafka = Kafkasource (datasource)
.parallelism(2)
.rebalance();

dataKafka.windowAll(GlobalWindows.create()).trigger(MyCountTrigger.of(100))
.process( new CustomTrigger())
.setParallelism(2) ---> error throws here
.rebalance()


Re: How to sort Iterable in ProcessWindowFunction?

2022-03-07 Thread HG
And the comparator function

The order of the return 1,0,-1 is relevant .
In this order -1,0,1 it will sort descending I discovered.

public static class SortEventsHandlingTime implements
Comparator> {

// Let's compare 2 Tuple4 objects
public int compare(Tuple4 o1,
Tuple4 o2)
{
if (Long.parseLong(o1.getField(0).toString()) >
Long.parseLong(o2.getField(0).toString())) {
return 1;
}
else if (Long.parseLong(o1.getField(0).toString()) ==
Long.parseLong(o2.getField(0).toString())){
return 0;
}
else {
return -1;
}
}
}


Op ma 7 mrt. 2022 om 03:05 schreef yidan zhao :

> Collect the elements to a list, then sort, then collect out.
>
> HG  于2022年3月3日周四 22:13写道:
>
>>   Hi,
>> I have need to sort the input of the ProcesWindowFunction by one of the
>> fields of the Tuple4 that is in the Iterator.
>>
>> Any advice as to what the best way is?
>>
>>  static class MyProcessWindowFunction extends
>> ProcessWindowFunction, String, String,
>> TimeWindow> {
>> @Override
>> public void process(String key, Context context,
>> Iterable> input, Collector out)
>> {
>> Long elapsed   = 0L;
>> Long pHandlingTime = 0L;
>> Long totalElapsed  = 0L
>>
>> System.out.println(input.getClass());
>>
>> Iterator> etter =
>> input.iterator();
>> *for (Tuple4 in: input){*
>> transactionId = in.getField(2).toString();
>> elapsed   = Long.parseLong(in.getField(1).toString())
>> - pHandlingTime;
>> totalElapsed  = totalElapsed + elapsed;
>> pHandlingTime = Long.parseLong(in.getField(1).toString())
>>
>> out.collect("Key : " + key + " Window : " +
>> context.window() + "  transactionId : "  + transactionId + "  elapsed : " +
>> elapsed.toString() + "  max handling time : " + h.toString() + "
>> totalElapsed " + totalElapsed);
>> }
>> }
>> }
>>
>>
>> Op do 3 mrt. 2022 om 15:12 schreef HG :
>>
>>> Hi,
>>> I have need to sort the input of the ProcesWindowFunction by one of the
>>> fields of the Tuple4 that is in the Iterator.
>>>
>>>  static class MyProcessWindowFunction extends
>>> ProcessWindowFunction, String, String,
>>> TimeWindow> {
>>> @Override
>>> public void process(String key, Context context,
>>> Iterable> input, Collector out)
>>> {
>>> Long elapsed   = 0L;
>>> Long pHandlingTime = 0L;
>>> Long totalElapsed  = 0L
>>>
>>> System.out.println(input.getClass());
>>>
>>> Iterator> etter =
>>> input.iterator();
>>> *for (Tuple4 in: input){*
>>> transactionId = in.getField(2).toString();
>>> elapsed   =
>>> Long.parseLong(in.getField(1).toString()) - pHandlingTime;
>>> totalElapsed  = totalElapsed + elapsed;
>>> pHandlingTime = Long.parseLong(in.getField(1).toString())
>>>
>>> out.collect("Key : " + key + " Window : " +
>>> context.window() + "  transactionId : "  + transactionId + "  elapsed : " +
>>> elapsed.toString() + "  max handling time : " + h.toString() + "
>>> totalElapsed " + totalElapsed);
>>> }
>>> }
>>> }
>>>
>>


Re: How to sort Iterable in ProcessWindowFunction?

2022-03-07 Thread HG
For the record. So that other unexperienced people my benefit too
 

List> inputList = new ArrayList<>();

input.forEach(inputList::add);
inputList.sort(new SortEventsHandlingTime());

for (Tuple4 in: inputList){


Op ma 7 mrt. 2022 om 03:05 schreef yidan zhao :

> Collect the elements to a list, then sort, then collect out.
>
> HG  于2022年3月3日周四 22:13写道:
>
>>   Hi,
>> I have need to sort the input of the ProcesWindowFunction by one of the
>> fields of the Tuple4 that is in the Iterator.
>>
>> Any advice as to what the best way is?
>>
>>  static class MyProcessWindowFunction extends
>> ProcessWindowFunction, String, String,
>> TimeWindow> {
>> @Override
>> public void process(String key, Context context,
>> Iterable> input, Collector out)
>> {
>> Long elapsed   = 0L;
>> Long pHandlingTime = 0L;
>> Long totalElapsed  = 0L
>>
>> System.out.println(input.getClass());
>>
>> Iterator> etter =
>> input.iterator();
>> *for (Tuple4 in: input){*
>> transactionId = in.getField(2).toString();
>> elapsed   = Long.parseLong(in.getField(1).toString())
>> - pHandlingTime;
>> totalElapsed  = totalElapsed + elapsed;
>> pHandlingTime = Long.parseLong(in.getField(1).toString())
>>
>> out.collect("Key : " + key + " Window : " +
>> context.window() + "  transactionId : "  + transactionId + "  elapsed : " +
>> elapsed.toString() + "  max handling time : " + h.toString() + "
>> totalElapsed " + totalElapsed);
>> }
>> }
>> }
>>
>>
>> Op do 3 mrt. 2022 om 15:12 schreef HG :
>>
>>> Hi,
>>> I have need to sort the input of the ProcesWindowFunction by one of the
>>> fields of the Tuple4 that is in the Iterator.
>>>
>>>  static class MyProcessWindowFunction extends
>>> ProcessWindowFunction, String, String,
>>> TimeWindow> {
>>> @Override
>>> public void process(String key, Context context,
>>> Iterable> input, Collector out)
>>> {
>>> Long elapsed   = 0L;
>>> Long pHandlingTime = 0L;
>>> Long totalElapsed  = 0L
>>>
>>> System.out.println(input.getClass());
>>>
>>> Iterator> etter =
>>> input.iterator();
>>> *for (Tuple4 in: input){*
>>> transactionId = in.getField(2).toString();
>>> elapsed   =
>>> Long.parseLong(in.getField(1).toString()) - pHandlingTime;
>>> totalElapsed  = totalElapsed + elapsed;
>>> pHandlingTime = Long.parseLong(in.getField(1).toString())
>>>
>>> out.collect("Key : " + key + " Window : " +
>>> context.window() + "  transactionId : "  + transactionId + "  elapsed : " +
>>> elapsed.toString() + "  max handling time : " + h.toString() + "
>>> totalElapsed " + totalElapsed);
>>> }
>>> }
>>> }
>>>
>>


Re: How to sort Iterable in ProcessWindowFunction?

2022-03-06 Thread yidan zhao
Collect the elements to a list, then sort, then collect out.

HG  于2022年3月3日周四 22:13写道:

>   Hi,
> I have need to sort the input of the ProcesWindowFunction by one of the
> fields of the Tuple4 that is in the Iterator.
>
> Any advice as to what the best way is?
>
>  static class MyProcessWindowFunction extends
> ProcessWindowFunction, String, String,
> TimeWindow> {
> @Override
> public void process(String key, Context context,
> Iterable> input, Collector out)
> {
> Long elapsed   = 0L;
> Long pHandlingTime = 0L;
> Long totalElapsed  = 0L
>
> System.out.println(input.getClass());
>
> Iterator> etter =
> input.iterator();
> *for (Tuple4 in: input){*
> transactionId = in.getField(2).toString();
> elapsed   = Long.parseLong(in.getField(1).toString())
> - pHandlingTime;
> totalElapsed  = totalElapsed + elapsed;
> pHandlingTime = Long.parseLong(in.getField(1).toString())
>
> out.collect("Key : " + key + " Window : " +
> context.window() + "  transactionId : "  + transactionId + "  elapsed : " +
> elapsed.toString() + "  max handling time : " + h.toString() + "
> totalElapsed " + totalElapsed);
> }
> }
> }
>
>
> Op do 3 mrt. 2022 om 15:12 schreef HG :
>
>> Hi,
>> I have need to sort the input of the ProcesWindowFunction by one of the
>> fields of the Tuple4 that is in the Iterator.
>>
>>  static class MyProcessWindowFunction extends
>> ProcessWindowFunction, String, String,
>> TimeWindow> {
>> @Override
>> public void process(String key, Context context,
>> Iterable> input, Collector out)
>> {
>> Long elapsed   = 0L;
>> Long pHandlingTime = 0L;
>> Long totalElapsed  = 0L
>>
>> System.out.println(input.getClass());
>>
>> Iterator> etter =
>> input.iterator();
>> *for (Tuple4 in: input){*
>> transactionId = in.getField(2).toString();
>> elapsed   = Long.parseLong(in.getField(1).toString())
>> - pHandlingTime;
>> totalElapsed  = totalElapsed + elapsed;
>> pHandlingTime = Long.parseLong(in.getField(1).toString())
>>
>> out.collect("Key : " + key + " Window : " +
>> context.window() + "  transactionId : "  + transactionId + "  elapsed : " +
>> elapsed.toString() + "  max handling time : " + h.toString() + "
>> totalElapsed " + totalElapsed);
>> }
>> }
>> }
>>
>


Re: How to sort Iterable in ProcessWindowFunction?

2022-03-03 Thread HG
  Hi,
I have need to sort the input of the ProcesWindowFunction by one of the
fields of the Tuple4 that is in the Iterator.

Any advice as to what the best way is?

 static class MyProcessWindowFunction extends
ProcessWindowFunction, String, String,
TimeWindow> {
@Override
public void process(String key, Context context,
Iterable> input, Collector out)
{
Long elapsed   = 0L;
Long pHandlingTime = 0L;
Long totalElapsed  = 0L

System.out.println(input.getClass());

Iterator> etter =
input.iterator();
*for (Tuple4 in: input){*
transactionId = in.getField(2).toString();
elapsed   = Long.parseLong(in.getField(1).toString()) -
pHandlingTime;
totalElapsed  = totalElapsed + elapsed;
pHandlingTime = Long.parseLong(in.getField(1).toString())

out.collect("Key : " + key + " Window : " +
context.window() + "  transactionId : "  + transactionId + "  elapsed : " +
elapsed.toString() + "  max handling time : " + h.toString() + "
totalElapsed " + totalElapsed);
}
}
}


Op do 3 mrt. 2022 om 15:12 schreef HG :

> Hi,
> I have need to sort the input of the ProcesWindowFunction by one of the
> fields of the Tuple4 that is in the Iterator.
>
>  static class MyProcessWindowFunction extends
> ProcessWindowFunction, String, String,
> TimeWindow> {
> @Override
> public void process(String key, Context context,
> Iterable> input, Collector out)
> {
> Long elapsed   = 0L;
> Long pHandlingTime = 0L;
> Long totalElapsed  = 0L
>
> System.out.println(input.getClass());
>
> Iterator> etter =
> input.iterator();
> *for (Tuple4 in: input){*
> transactionId = in.getField(2).toString();
> elapsed   = Long.parseLong(in.getField(1).toString())
> - pHandlingTime;
> totalElapsed  = totalElapsed + elapsed;
> pHandlingTime = Long.parseLong(in.getField(1).toString())
>
> out.collect("Key : " + key + " Window : " +
> context.window() + "  transactionId : "  + transactionId + "  elapsed : " +
> elapsed.toString() + "  max handling time : " + h.toString() + "
> totalElapsed " + totalElapsed);
> }
> }
> }
>


How to sort Iterable in ProcessWindowFunction?

2022-03-03 Thread HG
Hi,
I have need to sort the input of the ProcesWindowFunction by one of the
fields of the Tuple4 that is in the Iterator.

 static class MyProcessWindowFunction extends
ProcessWindowFunction, String, String,
TimeWindow> {
@Override
public void process(String key, Context context,
Iterable> input, Collector out)
{
Long elapsed   = 0L;
Long pHandlingTime = 0L;
Long totalElapsed  = 0L

System.out.println(input.getClass());

Iterator> etter =
input.iterator();
*for (Tuple4 in: input){*
transactionId = in.getField(2).toString();
elapsed   = Long.parseLong(in.getField(1).toString()) -
pHandlingTime;
totalElapsed  = totalElapsed + elapsed;
pHandlingTime = Long.parseLong(in.getField(1).toString())

out.collect("Key : " + key + " Window : " +
context.window() + "  transactionId : "  + transactionId + "  elapsed : " +
elapsed.toString() + "  max handling time : " + h.toString() + "
totalElapsed " + totalElapsed);
}
}
}


Re: processwindowfunction output Iterator

2022-03-01 Thread HG
Ok
I had the impression, obviously incorrect that it could be called only once.

Thanks

On Tue, Mar 1, 2022, 09:26 Schwalbe Matthias 
wrote:

> Goedemorgen Hans,
>
>
>
> You can call the out.collect(…) multiple times, i.e. for each forwarded
> event … how about this 
>
>
>
> Thias
>
>
>
>
>
> *From:* HG 
> *Sent:* Montag, 28. Februar 2022 16:25
> *To:* user 
> *Subject:* processwindowfunction output Iterator
>
>
>
> Hi,
>
>
>
>
>
> Can processwindowfunction output an Iterator?
>
> I need to sort and subtract timestamps from keyed events and then output
> them all with added elapsed times.
>
>
>
> Regards Hans
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>


RE: processwindowfunction output Iterator

2022-03-01 Thread Schwalbe Matthias
Goedemorgen Hans,

You can call the out.collect(…) multiple times, i.e. for each forwarded event … 
how about this 

Thias


From: HG 
Sent: Montag, 28. Februar 2022 16:25
To: user 
Subject: processwindowfunction output Iterator

Hi,


Can processwindowfunction output an Iterator?
I need to sort and subtract timestamps from keyed events and then output them 
all with added elapsed times.

Regards Hans
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


processwindowfunction output Iterator

2022-02-28 Thread HG
Hi,


Can processwindowfunction output an Iterator?
I need to sort and subtract timestamps from keyed events and then output
them all with added elapsed times.

Regards Hans


Re: clear() in a ProcessWindowFunction

2021-04-10 Thread Vishal Santoshi
Thank you for the confirmation. The simulations confirm too.

On Fri, Apr 9, 2021 at 11:14 AM Roman Khachatryan  wrote:

> Hi Vishal,
>
> Sorry for the late reply,
> Please find my answers below.
> By state I assume the state obtained via getRuntimeContext (access to
> window state is not allowed)..
>
> > The state is scoped to the key (created per key in the
> ProcessWindowFunction with a ttl )
> Yes.
>
> > The state will remain alive irrespective of whether the Window is closed
> or not (a TTL timer does the collection )
> Right, but you need to configure TTL when accessing the state [1]
>
> >  The execution on a key is sequential , as in if 2 events arrive for the
> 2 Sessions they happen sequentially ( or in any order but without the need
> of synchronization )
> Right.
>
> > The state mutated by an event in Session A, will be visible to Session B
> if an event incident on Session B was to happen subsequently.  There is no
> need of synchronizing access to the state as it for the same key.
> Right.
>
> Your understanding of merging of window contents is also correct.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
>
> Regards,
> Roman
>
>
> On Wed, Mar 31, 2021 at 5:45 PM Vishal Santoshi
>  wrote:
> >
> > I had a query Say I have a single key with 2 live sessions ( A and B )
> with a configured lateness .
> >
> > Do these invariants hold?
> >
> > * The state is scoped to the key (created per key in the
> ProcessWindowFunction with a ttl )
> > * The state will remain alive irrespective of whether the Window is
> closed or not (a TTL timer does the collection )
> > *  The execution on a key is sequential , as in if 2 events arrive for
> the 2 Sessions they happen sequentially ( or in any order but without the
> need of synchronization )
> > * The state mutated by an event in Session A, will be visible to Session
> B if an event incident on Session B was to happen subsequently.  There is
> no need of synchronizing access to the state as it for the same key.
> >
> > What I am not sure about is what happens when session A merge with
> session B. I would assume that it just is defining new start and end of the
> merged window, Gcing the old ones ( or at least one of them ) and assigning
> that even to that new window. What one does with the custom state in
> ProcessWindowFunction ( there is a CountTrigger of 1 ) ,  really what is
> done in the process method above,  As in this state is 1 degree removed
> from what ever flink does internally with it's merges given that the state
> is scoped to the key.
> >
> >
> >
> >
> >
> >
> >
> > On Fri, Mar 12, 2021 at 12:37 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
> >>
> >> Yep, makes sense.
> >>
> >> On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan 
> wrote:
> >>>
> >>> > Want to confirm that the keys are GCed ( along with state ) once
> the  (windows close + lateness ) ?
> >>> Window state is cleared (as well as the window itself), but global
> >>> state is not (unless you use TTL).
> >>>
> >>> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
> >>>
> >>> Regards,
> >>> Roman
> >>>
> >>> On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi
> >>>  wrote:
> >>> >
> >>> > Sometimes writing it down makes you think. I now realize that this
> is not the right approach, given that merging windows will have their own
> states..and how the merge happens is really at the key level
> >>> >
> >>> >
> >>> >
> >>> > On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
> >>> >>
> >>> >> I intend to augment every event in a session  with a unique ID.  To
> keep the session lean, there is a PurgingTrigger on this aggregate that
> fires on a count of 1.
> >>> >>
> >>> >> >> (except that the number of keys can grow).
> >>> >>
> >>> >> Want to confirm that the keys are GCed ( along with state ) once
> the  (windows close + lateness ) ?
> >>> >>
> >>> >>
> >>> >>
> >>> >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan 
> wrote:
> >>> >>>
> >>> >>> Hi Vishal,
> >>> >>&g

Re: clear() in a ProcessWindowFunction

2021-04-09 Thread Roman Khachatryan
Hi Vishal,

Sorry for the late reply,
Please find my answers below.
By state I assume the state obtained via getRuntimeContext (access to
window state is not allowed)..

> The state is scoped to the key (created per key in the ProcessWindowFunction 
> with a ttl )
Yes.

> The state will remain alive irrespective of whether the Window is closed or 
> not (a TTL timer does the collection )
Right, but you need to configure TTL when accessing the state [1]

>  The execution on a key is sequential , as in if 2 events arrive for the 2 
> Sessions they happen sequentially ( or in any order but without the need of 
> synchronization )
Right.

> The state mutated by an event in Session A, will be visible to Session B if 
> an event incident on Session B was to happen subsequently.  There is no need 
> of synchronizing access to the state as it for the same key.
Right.

Your understanding of merging of window contents is also correct.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl

Regards,
Roman


On Wed, Mar 31, 2021 at 5:45 PM Vishal Santoshi
 wrote:
>
> I had a query Say I have a single key with 2 live sessions ( A and B )  with 
> a configured lateness .
>
> Do these invariants hold?
>
> * The state is scoped to the key (created per key in the 
> ProcessWindowFunction with a ttl )
> * The state will remain alive irrespective of whether the Window is closed or 
> not (a TTL timer does the collection )
> *  The execution on a key is sequential , as in if 2 events arrive for the 2 
> Sessions they happen sequentially ( or in any order but without the need of 
> synchronization )
> * The state mutated by an event in Session A, will be visible to Session B if 
> an event incident on Session B was to happen subsequently.  There is no need 
> of synchronizing access to the state as it for the same key.
>
> What I am not sure about is what happens when session A merge with session B. 
> I would assume that it just is defining new start and end of the merged 
> window, Gcing the old ones ( or at least one of them ) and assigning that 
> even to that new window. What one does with the custom state in 
> ProcessWindowFunction ( there is a CountTrigger of 1 ) ,  really what is done 
> in the process method above,  As in this state is 1 degree removed from what 
> ever flink does internally with it's merges given that the state is scoped to 
> the key.
>
>
>
>
>
>
>
> On Fri, Mar 12, 2021 at 12:37 PM Vishal Santoshi  
> wrote:
>>
>> Yep, makes sense.
>>
>> On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan  wrote:
>>>
>>> > Want to confirm that the keys are GCed ( along with state ) once the  
>>> > (windows close + lateness ) ?
>>> Window state is cleared (as well as the window itself), but global
>>> state is not (unless you use TTL).
>>>
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
>>>
>>> Regards,
>>> Roman
>>>
>>> On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi
>>>  wrote:
>>> >
>>> > Sometimes writing it down makes you think. I now realize that this is not 
>>> > the right approach, given that merging windows will have their own 
>>> > states..and how the merge happens is really at the key level
>>> >
>>> >
>>> >
>>> > On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi 
>>> >  wrote:
>>> >>
>>> >> I intend to augment every event in a session  with a unique ID.  To keep 
>>> >> the session lean, there is a PurgingTrigger on this aggregate that  
>>> >> fires on a count of 1.
>>> >>
>>> >> >> (except that the number of keys can grow).
>>> >>
>>> >> Want to confirm that the keys are GCed ( along with state ) once the  
>>> >> (windows close + lateness ) ?
>>> >>
>>> >>
>>> >>
>>> >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan  
>>> >> wrote:
>>> >>>
>>> >>> Hi Vishal,
>>> >>>
>>> >>> There is no leak in the code you provided (except that the number of
>>> >>> keys can grow).
>>> >>> But as you figured out the state is scoped to key, not to window+key.
>>> >>>
>>> >>> Could you explain what you are trying to achieve and why do you need to 
>>> >>> combine
>>> >>> sliding windows with state scoped t

Re: clear() in a ProcessWindowFunction

2021-03-31 Thread Vishal Santoshi
I had a query Say I have a single key with 2 live sessions ( A and B )
with a configured lateness .

Do these invariants hold?

* The state is scoped to the key (created per key in the
ProcessWindowFunction with a ttl )
* The state will remain alive irrespective of whether the Window is closed
or not (a TTL timer does the collection )
*  The execution on a key is sequential , as in if 2 events arrive for the
2 Sessions they happen sequentially ( or in any order but without the need
of synchronization )
* The state mutated by an event in Session A, will be visible to Session B
if an event incident on Session B was to happen subsequently.  There is no
need of synchronizing access to the state as it for the same key.

What I am not sure about is what happens when session A merge with session
B. I would assume that it just is defining new start and end of the merged
window, Gcing the old ones ( or at least one of them ) and assigning that
even to that new window. What one does with the custom state in
ProcessWindowFunction ( there is a CountTrigger of 1 ) ,  really what is
done in the process method above,  As in this state is 1 degree removed
from what ever flink does internally with it's merges given that the state
is scoped to the key.







On Fri, Mar 12, 2021 at 12:37 PM Vishal Santoshi 
wrote:

> Yep, makes sense.
>
> On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan 
> wrote:
>
>> > Want to confirm that the keys are GCed ( along with state ) once the
>> (windows close + lateness ) ?
>> Window state is cleared (as well as the window itself), but global
>> state is not (unless you use TTL).
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
>>
>> Regards,
>> Roman
>>
>> On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi
>>  wrote:
>> >
>> > Sometimes writing it down makes you think. I now realize that this is
>> not the right approach, given that merging windows will have their own
>> states..and how the merge happens is really at the key level
>> >
>> >
>> >
>> > On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>> >>
>> >> I intend to augment every event in a session  with a unique ID.  To
>> keep the session lean, there is a PurgingTrigger on this aggregate that
>> fires on a count of 1.
>> >>
>> >> >> (except that the number of keys can grow).
>> >>
>> >> Want to confirm that the keys are GCed ( along with state ) once the
>> (windows close + lateness ) ?
>> >>
>> >>
>> >>
>> >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan 
>> wrote:
>> >>>
>> >>> Hi Vishal,
>> >>>
>> >>> There is no leak in the code you provided (except that the number of
>> >>> keys can grow).
>> >>> But as you figured out the state is scoped to key, not to window+key.
>> >>>
>> >>> Could you explain what you are trying to achieve and why do you need
>> to combine
>> >>> sliding windows with state scoped to window+key?
>> >>>
>> >>> Regards,
>> >>> Roman
>> >>>
>> >>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
>> >>>  wrote:
>> >>> >
>> >>> > Essentially, Does this code leak state
>> >>> >
>> >>> > private static class SessionIdProcessWindowFunction> java.io.Serializable, VALUE extends java.io.Serializable>
>> >>> > extends
>> >>> > ProcessWindowFunction,
>> KeyedSessionWithSessionID, KEY, TimeWindow> {
>> >>> > private static final long serialVersionUID = 1L;
>> >>> > private final static ValueStateDescriptor sessionId = new
>> ValueStateDescriptor("session_uid",
>> >>> > String.class);
>> >>> >
>> >>> > @Override
>> >>> > public void process(KEY key,
>> >>> > ProcessWindowFunction,
>> KeyedSessionWithSessionID, KEY, TimeWindow>.Context context,
>> >>> > Iterable> elements,
>> Collector> out)
>> >>> > throws Exception {
>> >>> > // I need this scoped to key/window
>> >>> > if (getRuntimeContext().getState(sessionId).value() == null) {
>> >>> > UUID uuid = UUID.randomUUID();
>> >>> > getRuntimeContext().getState(sessionId).update(uuid.toString());
>> >>> > }
>&g

Re: clear() in a ProcessWindowFunction

2021-03-12 Thread Vishal Santoshi
Yep, makes sense.

On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan  wrote:

> > Want to confirm that the keys are GCed ( along with state ) once the
> (windows close + lateness ) ?
> Window state is cleared (as well as the window itself), but global
> state is not (unless you use TTL).
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
>
> Regards,
> Roman
>
> On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi
>  wrote:
> >
> > Sometimes writing it down makes you think. I now realize that this is
> not the right approach, given that merging windows will have their own
> states..and how the merge happens is really at the key level
> >
> >
> >
> > On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
> >>
> >> I intend to augment every event in a session  with a unique ID.  To
> keep the session lean, there is a PurgingTrigger on this aggregate that
> fires on a count of 1.
> >>
> >> >> (except that the number of keys can grow).
> >>
> >> Want to confirm that the keys are GCed ( along with state ) once the
> (windows close + lateness ) ?
> >>
> >>
> >>
> >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan 
> wrote:
> >>>
> >>> Hi Vishal,
> >>>
> >>> There is no leak in the code you provided (except that the number of
> >>> keys can grow).
> >>> But as you figured out the state is scoped to key, not to window+key.
> >>>
> >>> Could you explain what you are trying to achieve and why do you need
> to combine
> >>> sliding windows with state scoped to window+key?
> >>>
> >>> Regards,
> >>> Roman
> >>>
> >>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
> >>>  wrote:
> >>> >
> >>> > Essentially, Does this code leak state
> >>> >
> >>> > private static class SessionIdProcessWindowFunction java.io.Serializable, VALUE extends java.io.Serializable>
> >>> > extends
> >>> > ProcessWindowFunction,
> KeyedSessionWithSessionID, KEY, TimeWindow> {
> >>> > private static final long serialVersionUID = 1L;
> >>> > private final static ValueStateDescriptor sessionId = new
> ValueStateDescriptor("session_uid",
> >>> > String.class);
> >>> >
> >>> > @Override
> >>> > public void process(KEY key,
> >>> > ProcessWindowFunction,
> KeyedSessionWithSessionID, KEY, TimeWindow>.Context context,
> >>> > Iterable> elements,
> Collector> out)
> >>> > throws Exception {
> >>> > // I need this scoped to key/window
> >>> > if (getRuntimeContext().getState(sessionId).value() == null) {
> >>> > UUID uuid = UUID.randomUUID();
> >>> > getRuntimeContext().getState(sessionId).update(uuid.toString());
> >>> > }
> >>> > String uuid = getRuntimeContext().getState(sessionId).value();
> >>> > out.collect(new
> KeyedSessionWithSessionID<>(elements.iterator().next(), uuid));
> >>> > }
> >>> > }
> >>> >
> >>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
> >>> >>
> >>> >> Hello folks,
> >>> >>   The suggestion is to use windowState() for a key
> key per window state and clear the state explicitly.  Also it seems that
> getRuntime().getState() will return a globalWindow() where state is shared
> among windows with the same key. I desire of course to have state scoped to
> a key per window and was wanting to use windowState().. The caveat is that
> my window is a Session Window and when I try to use clear()  I am thrown
> this exception  ( Session Windows are Merging Windows )
> >>> >>
> >>> >> Caused by: java.lang.UnsupportedOperationException: Per-window
> state is not allowed when using merging windows.
> >>> >>
> >>> >>
> >>> >> The questions are
> >>> >>
> >>> >> * How do I have state per session window/ per key and still be able
> to clear it ?
> >>> >> * Does getRuntime().getState() give me the clear() semantics for
> free along with state per window per key and thus I  have understood
> getRuntime().getState() wrong ?
> >>> >>
> >>> >> Regards.
> >>> >>
> >>> >>
> >>> >>
>


Re: clear() in a ProcessWindowFunction

2021-03-12 Thread Roman Khachatryan
> Want to confirm that the keys are GCed ( along with state ) once the  
> (windows close + lateness ) ?
Window state is cleared (as well as the window itself), but global
state is not (unless you use TTL).

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl

Regards,
Roman

On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi
 wrote:
>
> Sometimes writing it down makes you think. I now realize that this is not the 
> right approach, given that merging windows will have their own states..and 
> how the merge happens is really at the key level
>
>
>
> On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi  
> wrote:
>>
>> I intend to augment every event in a session  with a unique ID.  To keep the 
>> session lean, there is a PurgingTrigger on this aggregate that  fires on a 
>> count of 1.
>>
>> >> (except that the number of keys can grow).
>>
>> Want to confirm that the keys are GCed ( along with state ) once the  
>> (windows close + lateness ) ?
>>
>>
>>
>> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan  wrote:
>>>
>>> Hi Vishal,
>>>
>>> There is no leak in the code you provided (except that the number of
>>> keys can grow).
>>> But as you figured out the state is scoped to key, not to window+key.
>>>
>>> Could you explain what you are trying to achieve and why do you need to 
>>> combine
>>> sliding windows with state scoped to window+key?
>>>
>>> Regards,
>>> Roman
>>>
>>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
>>>  wrote:
>>> >
>>> > Essentially, Does this code leak state
>>> >
>>> > private static class SessionIdProcessWindowFunction>> > java.io.Serializable, VALUE extends java.io.Serializable>
>>> > extends
>>> > ProcessWindowFunction, 
>>> > KeyedSessionWithSessionID, KEY, TimeWindow> {
>>> > private static final long serialVersionUID = 1L;
>>> > private final static ValueStateDescriptor sessionId = new 
>>> > ValueStateDescriptor("session_uid",
>>> > String.class);
>>> >
>>> > @Override
>>> > public void process(KEY key,
>>> > ProcessWindowFunction, 
>>> > KeyedSessionWithSessionID, KEY, TimeWindow>.Context context,
>>> > Iterable> elements, 
>>> > Collector> out)
>>> > throws Exception {
>>> > // I need this scoped to key/window
>>> > if (getRuntimeContext().getState(sessionId).value() == null) {
>>> > UUID uuid = UUID.randomUUID();
>>> > getRuntimeContext().getState(sessionId).update(uuid.toString());
>>> > }
>>> > String uuid = getRuntimeContext().getState(sessionId).value();
>>> > out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(), 
>>> > uuid));
>>> > }
>>> > }
>>> >
>>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi 
>>> >  wrote:
>>> >>
>>> >> Hello folks,
>>> >>   The suggestion is to use windowState() for a key key 
>>> >> per window state and clear the state explicitly.  Also it seems that 
>>> >> getRuntime().getState() will return a globalWindow() where state is 
>>> >> shared among windows with the same key. I desire of course to have state 
>>> >> scoped to a key per window and was wanting to use windowState().. The 
>>> >> caveat is that my window is a Session Window and when I try to use 
>>> >> clear()  I am thrown this exception  ( Session Windows are Merging 
>>> >> Windows )
>>> >>
>>> >> Caused by: java.lang.UnsupportedOperationException: Per-window state is 
>>> >> not allowed when using merging windows.
>>> >>
>>> >>
>>> >> The questions are
>>> >>
>>> >> * How do I have state per session window/ per key and still be able to 
>>> >> clear it ?
>>> >> * Does getRuntime().getState() give me the clear() semantics for free 
>>> >> along with state per window per key and thus I  have understood  
>>> >> getRuntime().getState() wrong ?
>>> >>
>>> >> Regards.
>>> >>
>>> >>
>>> >>


Re: clear() in a ProcessWindowFunction

2021-03-12 Thread Vishal Santoshi
Sometimes writing it down makes you think. I now realize that this is not
the right approach, given that merging windows will have their own
states..and how the merge happens is really at the key level



On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi 
wrote:

> I intend to augment every event in a session  with a unique ID.  To keep
> the session lean, there is a PurgingTrigger on this aggregate that  fires
> on a count of 1.
>
> >> (except that the number of keys can grow).
>
> Want to confirm that the keys are GCed ( along with state ) once the
> (windows close + lateness ) ?
>
>
>
> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan 
> wrote:
>
>> Hi Vishal,
>>
>> There is no leak in the code you provided (except that the number of
>> keys can grow).
>> But as you figured out the state is scoped to key, not to window+key.
>>
>> Could you explain what you are trying to achieve and why do you need to
>> combine
>> sliding windows with state scoped to window+key?
>>
>> Regards,
>> Roman
>>
>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
>>  wrote:
>> >
>> > Essentially, Does this code leak state
>> >
>> > private static class SessionIdProcessWindowFunction> java.io.Serializable, VALUE extends java.io.Serializable>
>> > extends
>> > ProcessWindowFunction,
>> KeyedSessionWithSessionID, KEY, TimeWindow> {
>> > private static final long serialVersionUID = 1L;
>> > private final static ValueStateDescriptor sessionId = new
>> ValueStateDescriptor("session_uid",
>> > String.class);
>> >
>> > @Override
>> > public void process(KEY key,
>> > ProcessWindowFunction,
>> KeyedSessionWithSessionID, KEY, TimeWindow>.Context context,
>> > Iterable> elements,
>> Collector> out)
>> > throws Exception {
>> > // I need this scoped to key/window
>> > if (getRuntimeContext().getState(sessionId).value() == null) {
>> > UUID uuid = UUID.randomUUID();
>> > getRuntimeContext().getState(sessionId).update(uuid.toString());
>> > }
>> > String uuid = getRuntimeContext().getState(sessionId).value();
>> > out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(),
>> uuid));
>> > }
>> > }
>> >
>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>> >>
>> >> Hello folks,
>> >>   The suggestion is to use windowState() for a key key
>> per window state and clear the state explicitly.  Also it seems that
>> getRuntime().getState() will return a globalWindow() where state is shared
>> among windows with the same key. I desire of course to have state scoped to
>> a key per window and was wanting to use windowState().. The caveat is that
>> my window is a Session Window and when I try to use clear()  I am thrown
>> this exception  ( Session Windows are Merging Windows )
>> >>
>> >> Caused by: java.lang.UnsupportedOperationException: Per-window state
>> is not allowed when using merging windows.
>> >>
>> >>
>> >> The questions are
>> >>
>> >> * How do I have state per session window/ per key and still be able to
>> clear it ?
>> >> * Does getRuntime().getState() give me the clear() semantics for free
>> along with state per window per key and thus I  have understood
>> getRuntime().getState() wrong ?
>> >>
>> >> Regards.
>> >>
>> >>
>> >>
>>
>


Re: clear() in a ProcessWindowFunction

2021-03-12 Thread Vishal Santoshi
I intend to augment every event in a session  with a unique ID.  To keep
the session lean, there is a PurgingTrigger on this aggregate that  fires
on a count of 1.

>> (except that the number of keys can grow).

Want to confirm that the keys are GCed ( along with state ) once the
(windows close + lateness ) ?



On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan  wrote:

> Hi Vishal,
>
> There is no leak in the code you provided (except that the number of
> keys can grow).
> But as you figured out the state is scoped to key, not to window+key.
>
> Could you explain what you are trying to achieve and why do you need to
> combine
> sliding windows with state scoped to window+key?
>
> Regards,
> Roman
>
> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
>  wrote:
> >
> > Essentially, Does this code leak state
> >
> > private static class SessionIdProcessWindowFunction java.io.Serializable, VALUE extends java.io.Serializable>
> > extends
> > ProcessWindowFunction,
> KeyedSessionWithSessionID, KEY, TimeWindow> {
> > private static final long serialVersionUID = 1L;
> > private final static ValueStateDescriptor sessionId = new
> ValueStateDescriptor("session_uid",
> > String.class);
> >
> > @Override
> > public void process(KEY key,
> > ProcessWindowFunction,
> KeyedSessionWithSessionID, KEY, TimeWindow>.Context context,
> > Iterable> elements,
> Collector> out)
> > throws Exception {
> > // I need this scoped to key/window
> > if (getRuntimeContext().getState(sessionId).value() == null) {
> > UUID uuid = UUID.randomUUID();
> > getRuntimeContext().getState(sessionId).update(uuid.toString());
> > }
> > String uuid = getRuntimeContext().getState(sessionId).value();
> > out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(),
> uuid));
> > }
> > }
> >
> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
> >>
> >> Hello folks,
> >>   The suggestion is to use windowState() for a key key
> per window state and clear the state explicitly.  Also it seems that
> getRuntime().getState() will return a globalWindow() where state is shared
> among windows with the same key. I desire of course to have state scoped to
> a key per window and was wanting to use windowState().. The caveat is that
> my window is a Session Window and when I try to use clear()  I am thrown
> this exception  ( Session Windows are Merging Windows )
> >>
> >> Caused by: java.lang.UnsupportedOperationException: Per-window state is
> not allowed when using merging windows.
> >>
> >>
> >> The questions are
> >>
> >> * How do I have state per session window/ per key and still be able to
> clear it ?
> >> * Does getRuntime().getState() give me the clear() semantics for free
> along with state per window per key and thus I  have understood
> getRuntime().getState() wrong ?
> >>
> >> Regards.
> >>
> >>
> >>
>


Re: clear() in a ProcessWindowFunction

2021-03-12 Thread Roman Khachatryan
Hi Vishal,

There is no leak in the code you provided (except that the number of
keys can grow).
But as you figured out the state is scoped to key, not to window+key.

Could you explain what you are trying to achieve and why do you need to combine
sliding windows with state scoped to window+key?

Regards,
Roman

On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
 wrote:
>
> Essentially, Does this code leak state
>
> private static class SessionIdProcessWindowFunction java.io.Serializable, VALUE extends java.io.Serializable>
> extends
> ProcessWindowFunction, 
> KeyedSessionWithSessionID, KEY, TimeWindow> {
> private static final long serialVersionUID = 1L;
> private final static ValueStateDescriptor sessionId = new 
> ValueStateDescriptor("session_uid",
> String.class);
>
> @Override
> public void process(KEY key,
> ProcessWindowFunction, 
> KeyedSessionWithSessionID, KEY, TimeWindow>.Context context,
> Iterable> elements, 
> Collector> out)
> throws Exception {
> // I need this scoped to key/window
> if (getRuntimeContext().getState(sessionId).value() == null) {
> UUID uuid = UUID.randomUUID();
> getRuntimeContext().getState(sessionId).update(uuid.toString());
> }
> String uuid = getRuntimeContext().getState(sessionId).value();
> out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(), 
> uuid));
> }
> }
>
> On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi  
> wrote:
>>
>> Hello folks,
>>   The suggestion is to use windowState() for a key key per 
>> window state and clear the state explicitly.  Also it seems that 
>> getRuntime().getState() will return a globalWindow() where state is shared 
>> among windows with the same key. I desire of course to have state scoped to 
>> a key per window and was wanting to use windowState().. The caveat is that 
>> my window is a Session Window and when I try to use clear()  I am thrown 
>> this exception  ( Session Windows are Merging Windows )
>>
>> Caused by: java.lang.UnsupportedOperationException: Per-window state is not 
>> allowed when using merging windows.
>>
>>
>> The questions are
>>
>> * How do I have state per session window/ per key and still be able to clear 
>> it ?
>> * Does getRuntime().getState() give me the clear() semantics for free along 
>> with state per window per key and thus I  have understood  
>> getRuntime().getState() wrong ?
>>
>> Regards.
>>
>>
>>


Re: clear() in a ProcessWindowFunction

2021-03-11 Thread Vishal Santoshi
Essentially, Does this code leak state

private static class SessionIdProcessWindowFunction
extends
ProcessWindowFunction, KeyedSessionWithSessionID<
KEY, VALUE>, KEY, TimeWindow> {
private static final long serialVersionUID = 1L;
private final static ValueStateDescriptor sessionId = new
ValueStateDescriptor("session_uid",
String.class);

@Override
public void process(KEY key,
ProcessWindowFunction, KeyedSessionWithSessionID<
KEY, VALUE>, KEY, TimeWindow>.Context context,
Iterable> elements, Collector<
KeyedSessionWithSessionID> out)
throws Exception {
// *I need this scoped to key/window*
if (getRuntimeContext().getState(sessionId).value() == null) {
UUID uuid = UUID.randomUUID();
getRuntimeContext().getState(sessionId).update(uuid.toString());
}
String uuid = getRuntimeContext().getState(sessionId).value();
out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(), uuid
));
}
}

On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi 
wrote:

> Hello folks,
>   The suggestion is to use windowState() for a key key per
> window state and clear the state explicitly.  Also it seems that
> getRuntime().getState() will return a globalWindow() where state is shared
> among windows with the same key. I desire of course to have state scoped to
> a key per window and was wanting to use windowState().. The caveat is that
> my window is a Session Window and when I try to use clear()  I am
> thrown this exception  ( Session Windows are Merging Windows )
>
> Caused by: java.lang.UnsupportedOperationException: Per-window state is
> not allowed when using merging windows.
>
>
> The questions are
>
> * How do I have state per *session* window/ per key and still be able to
> clear it ?
> * Does getRuntime().getState() give me the clear() semantics for free
> along with state per window per key and thus I  have
> understood  getRuntime().getState() wrong ?
>
> Regards.
>
>
>
>


clear() in a ProcessWindowFunction

2021-03-11 Thread Vishal Santoshi
Hello folks,
  The suggestion is to use windowState() for a key key per
window state and clear the state explicitly.  Also it seems that
getRuntime().getState() will return a globalWindow() where state is shared
among windows with the same key. I desire of course to have state scoped to
a key per window and was wanting to use windowState().. The caveat is that
my window is a Session Window and when I try to use clear()  I am
thrown this exception  ( Session Windows are Merging Windows )

Caused by: java.lang.UnsupportedOperationException: Per-window state is not
allowed when using merging windows.


The questions are

* How do I have state per *session* window/ per key and still be able to
clear it ?
* Does getRuntime().getState() give me the clear() semantics for free along
with state per window per key and thus I  have
understood  getRuntime().getState() wrong ?

Regards.


Re: How to use ProcessWindowFunction in pyflink?

2021-02-24 Thread Arvid Heise
Hi Hongyuan,

it seems as if PyFlink's datastream API is still lacking window support
[1], which is targeted for next release.

Examples for windows in PyFlink's table API are available here [2].

from pyflink.table.window import Tumblefrom pyflink.table.expressions
import lit, col
orders = t_env.from_path("Orders")result =
orders.window(Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w"))
\
   .group_by(orders.a, col('w')) \
   .select(orders.a, col('w').start, col('w').end,
orders.b.sum.alias('d'))



[1] https://issues.apache.org/jira/browse/FLINK-21202
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/operations.html#aggregations

On Fri, Feb 19, 2021 at 8:26 AM Hongyuan Ma  wrote:

> Greetings,
>
> I am a newbie to pyflink. I want to be able to use processWindowFunction
> in a Tumble Window, and finally output 0 or more lines. I have checked the
> datastreamAPI and TableAPI of pyflink, but have not found a complete
> example. pyflink's datastream API does not seem to implement window() yet.
> And I'm not sure how to use TableAPI.
>
> If I use java to implement "public class MyProcessWindowFunctextends
> ProcessWindowFunction” and registered as udf in python,
> is it possible to call it through select statement in pyflink? Can the
> select statement correctly return zero or more rows of results?
>
> Any help will be appreciated!
>
> -
> Best Regards,
> Hongyuan Ma
>


如何在pyflink中使用全量窗口聚合ProcessWindowFunction

2021-02-18 Thread Hongyuan Ma
向您问好,


我是一名pyflink的新手。我希望能够在Tumble Window中使用processWindowFunction, 
对窗口内数据进行全量计算并最终输出0行或者多行。我查阅了pyflink的datastreamAPI和TableAPI,都没有找到完整的示例。pyflink 
的datastreamAPI目前似乎还没有实现window()。而我对TableAPI的使用方法还不太明确。
假如我使用java实现了“public class MyProcessWindowFunctextends ProcessWindowFunction 
{}”, 打成jar包在pyflink中注册为udf, 
有可能在TableAPI中通过select语句调用它吗, select语句可以正确地返回0行或者多行结果吗?如果能提供一个pyflink简单的 
processWindowFunction的示例, 我将不胜感激!




提前感谢您的帮助!
马宏元

How to use ProcessWindowFunction in pyflink?

2021-02-18 Thread Hongyuan Ma
Greetings,


I am a newbie to pyflink. I want to be able to use processWindowFunction in a 
Tumble Window, and finally output 0 or more lines. I have checked the 
datastreamAPI and TableAPI of pyflink, but have not found a complete example. 
pyflink's datastream API does not seem to implement window() yet. And I'm not 
sure how to use TableAPI. 


If I use java to implement "public class MyProcessWindowFunctextends 
ProcessWindowFunction” and registered as udf in python,
is it possible to call it through select statement in pyflink? Can the select 
statement correctly return zero or more rows of results?


Any help will be appreciated!


-
Best Regards,
Hongyuan Ma

Re: What is the difference between RuntimeContext state and ProcessWindowFunction.Context state when using a ProcessWindowFunction?

2021-02-11 Thread Arvid Heise
Hi Marco,

1. RuntimeContext is available to all operators and is bound to the current
key (if used in a keyed context).
2. Is the same actually. Not sure why it's exposed twice... So use either
one.
3. Is additionally bound to the current window.

On Tue, Feb 9, 2021 at 10:46 PM Marco Villalobos 
wrote:

> Hi,
>
> I am having a difficult time distinguishing the difference between
> RuntimeContext state and global state when using a ProcessWindowFunction.
>
> A ProcessWindowFunction has three access different kinds of state.
> 1. RuntimeContext state.
> 2. ProcessWindowFunction.Context global state
> 3. ProcessWindowFunction.Context window state.
>
> It's clear to me that the window state belongs to a window, the lines
> seemed a bit blurred between RuntimeContext state and
> ProcessWindowFunction.Context global state.
>
> Can somebody please elaborate on the differences, and when I should use
> global state vs runtime context state?
>
> Thank you.
>


What is the difference between RuntimeContext state and ProcessWindowFunction.Context state when using a ProcessWindowFunction?

2021-02-09 Thread Marco Villalobos
Hi,

I am having a difficult time distinguishing the difference between
RuntimeContext state and global state when using a ProcessWindowFunction.

A ProcessWindowFunction has three access different kinds of state.
1. RuntimeContext state.
2. ProcessWindowFunction.Context global state
3. ProcessWindowFunction.Context window state.

It's clear to me that the window state belongs to a window, the lines
seemed a bit blurred between RuntimeContext state and
ProcessWindowFunction.Context global state.

Can somebody please elaborate on the differences, and when I should use
global state vs runtime context state?

Thank you.


?????? ProcessWindowFunction??????clear??????????????????-v1.10.1

2020-08-27 Thread x
??8??
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))




----
??: 
   "x"  
  <35907...@qq.com;
:2020??8??27??(??) 2:00
??:"user-zh@flink.apache.org"

?????? ProcessWindowFunction??????clear??????????????????-v1.10.1

2020-08-27 Thread x
10??1??.window(TumblingEventTimeWindows.of(Time.minutes(10))).trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))10


----
??: 
   "user-zh"



Re: ProcessWindowFunction为何在clear方法中无法清理状态-v1.10.1

2020-08-25 Thread shizk233
按我的理解,参考aggregate(AggregateFunction aggFunction,
ProcessWindowFunction windowFunction)方法,
窗口中的状态数据是存在某个聚合函数里的,processWindowFunction只是处理窗口的结果,需要通过context获取对应的窗口state来做清理。

x <35907...@qq.com> 于2020年8月25日周二 下午6:25写道:

>
> 按天开窗,按分钟触发,重写了ProcessWindowFunction的clear方法,希望在窗口结束的时候,将状态清理,可实际执行中,跨天状态并未清理,代买如下env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)…
> .window(TumblingEventTimeWindows.of(Time.days(1)))
> .trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))
> .evictor(TimeEvictor.of(Time.seconds(0), true))
> .process(new ProcessWindowFunction[IN,OUT,KEY,TimeWindow]{
> private var state: MapState[String,Boolean] = _
> override def open
> override def process
> override def clear(ctx: Context): Unit = {
> state.clear()
> }
> }


ProcessWindowFunction??????clear??????????????????-v1.10.1

2020-08-25 Thread x
ProcessWindowFunction??clearenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)??
.window(TumblingEventTimeWindows.of(Time.days(1)))
.trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))
.evictor(TimeEvictor.of(Time.seconds(0), true))
.process(new ProcessWindowFunction[IN,OUT,KEY,TimeWindow]{
private var state: MapState[String,Boolean] = _
override def open
override def process
override def clear(ctx: Context): Unit = {
state.clear()
}
}

Re: 回复: ProcessWindowFunction中如何有效清除state呢

2020-03-31 Thread Yun Tang
Hi

我觉得你的整个程序能从没有checkpoint开始跑就很奇怪,你们的 value state descriptor里面没有定义default 
value,那么调用#value() 接口返回的就是null,所以第一次调用 #update 时候还从state里面取值,最后还能跑通就很奇怪。

我建议本地在IDE里面debug看一下吧,可以把clear的条件改一下,不要弄成隔天才清理,可以让本地可以复现问题。

祝好
唐云

From: 守护 <346531...@qq.com>
Sent: Tuesday, March 31, 2020 16:31
To: user-zh 
Subject: 回复: ProcessWindowFunction中如何有效清除state呢

感谢您回复


代码中if(stateDate.equals("") || 
stateDate.equals(date))的判断逻辑确实能走到pv_st.clear()中,1.最后输出结果时发现pv_st中的状态没有清空,还是累加计算,2.state.clear()
 之后,再次获取时,返回值会是null,代码片段里面确实没有对null值的校验,而是直接更新值pv_st.update(pv_st.value() + 
c_st),是和这个能有关系吗








--原始邮件--
发件人:"Yun Tang"

?????? ProcessWindowFunction??????????????state??

2020-03-31 Thread ????
??


??if(stateDate.equals("") || 
stateDate.equals(date))pv_st.clear()1.??pv_st2.state.clear()
 
nullnullpv_st.update(pv_st.value()
 + c_st)








----
??:"Yun Tang"

Re: ProcessWindowFunction中如何有效清除state呢

2020-03-31 Thread Yun Tang
Hi

从代码看 if(stateDate.equals("") || stateDate.equals(date)) 
无法判断究竟是从哪里获取到stateDate变量赋值,不清楚你这里里面的判断逻辑是否能生效。
其次,state.clear() 之后,再次获取时,返回值会是null,代码片段里面也没有看出来哪里有对这个的校验。

祝好
唐云

From: 守护 <346531...@qq.com>
Sent: Tuesday, March 31, 2020 12:33
To: user-zh 
Subject: ProcessWindowFunction中如何有效清除state呢

各位好:


--版本
FLINK 1.10.0 ON YARN


--过程
1.定义一个 .window(TumblingProcessingTimeWindows.of(Time.days(1)))窗口
2.定义一个new 
Trigger(),.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10)))
3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,业务是每天0点开始算这一天的数据,第二天清空从新计算,
--问题
在 new 
ProcessWindowFunction()中创建了ValueState,想在第二天0点的时候ValueState清空开始重新计算,但是返现ValueState并没有清空,而是叠加前一天的继续计算,这个.clear()方法应该在什么时候加,才能生效呢?



--部分代码


  .window(TumblingProcessingTimeWindows.of(Time.days(1)))   
.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10)))
.process(new ProcessWindowFunction[(String,String,Long), String, Tuple, 
TimeWindow] {


private var pv_st: ValueState[Long] = _ 



override def open(parameters: Configuration): Unit 
= {
pv_st = getRuntimeContext.getState[Long](new 
ValueStateDescriptor[Long]("pv_stCount", classOf[Long]))
}


   override def process(key: Tuple, context: Context, 
elements: Iterable[(String,String,Long)], out: Collector[String]): Unit = {
 var c_st = 0
 
 val elementsIterator = elements.iterator
 // 遍历窗口数据,获取唯一word
 while (elementsIterator.hasNext) {
  val ac_name = 
elementsIterator.next()._2
  if(!ac_name.isEmpty  
ac_name.equals("listentime")){
   c_st +=1
  }
 }
 val time: Date = new Date()
 val dateFormat: SimpleDateFormat = new 
SimpleDateFormat("-MM-dd")
 val date = dateFormat.format(time)


 // add current
pv_st.update(pv_st.value() + c_st)


 var jsonStr = 
""+key.getField(0)+"_"+date+"" // json格式开始
 jsonStr += "{"+
  
"\"yesterday_foreground_play_pv\":\""+pv_st.value()+
  "\"}";




 //判断逻辑,是否到第二天,如果到第二天状态数据全部清空,重新累加
 if(stateDate.equals("") || 
stateDate.equals(date)){
  stateDate=date
  out.collect(jsonStr)
 }else{
  out.collect(jsonStr)
  pv_st.clear()
  stateDate=date
 }


}


   })


ProcessWindowFunction 问题求助

2020-03-18 Thread 郭红科
大神:
报错信息:
Error:(73, 27) java: 对于process(com.linkedsee.aiops.CountErrorFunction), 找不到合适的方法
方法 
org.apache.flink.streaming.api.datastream.DataStream.process(org.apache.flink.streaming.api.functions.ProcessFunction)不适用
  (无法推断类型变量 R
(参数不匹配; 
com.linkedsee.aiops.CountErrorFunction无法转换为org.apache.flink.streaming.api.functions.ProcessFunction))
方法 
org.apache.flink.streaming.api.datastream.DataStream.process(org.apache.flink.streaming.api.functions.ProcessFunction,org.apache.flink.api.common.typeinfo.TypeInformation)不适用
  (无法推断类型变量 R
(实际参数列表和形式参数列表长度不同))

StreamingJob.java 
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.linkedsee.aiops;

import org.apache.flink.api.java.functions.KeySelector;
import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import 
org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;

import java.util.*;

/**
 * Skeleton for a Flink Streaming Job.
 *
 * For a tutorial how to write a Flink streaming application, check the
 * tutorials and examples on the https://flink.apache.org/docs/stable/;>Flink Website.
 *
 * To package your application into a JAR file for execution, run
 * 'mvn clean package' on the command line.
 *
 * If you change the name of the main class (with the public static void 
main(String[] args))
 * method, change the respective entry in the POM.xml file (simply search for 
'mainClass').
 */
public class StreamingJob {

   public static void main(String[] args) throws Exception {
  // set up the streaming execution environment
  final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);

  Properties properties = new Properties();
  properties.setProperty("bootstrap.servers", "xxx:9092");
  properties.setProperty("auto.offset.reset", "earliest");
  properties.setProperty("group.id", "test");

  FlinkKafkaConsumer fConsumer =
new  FlinkKafkaConsumer<>("aiops_log", new 
JSONKeyValueDeserializationSchema(false), properties);
  fConsumer.assignTimestampsAndWatermarks(new AssignerWatermarks());

  DataStream dataStream = env
.addSource(fConsumer);

  dataStream
.keyBy(new KeySelector() {
@Override
public Object getKey(ObjectNode objectNode) throws 
Exception {
return objectNode.get("value").get("type");
}
})
.timeWindow(Time.seconds(60));

  dataStream.process(new CountErrorFunction());

dataStream.print();


  /*
   * Here, you can start creating your execution plan for Flink.
   *
   * Start with getting some data from the environment, like
   * env.readTextFile(textPath);
   *
   * then, transform the resulting DataStream using operations
   * like
   * .filter()
   * .flatMap()
   * .join()
   * .coGroup()
   *
   * and many more.
   * Have a look at the programming guide for the Java API:
   *
   * https://flink.apache.org/docs/latest/apis/streaming/index.html
   *
   */

  // execute program
  env.execute("Error Detection");
   }


}
CountErrorFunction.java 
package com.linkedsee.aiops;

import jdk.nashorn.internal.ir.ObjectNode;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;


public class CountErrorFunction extends ProcessWindowFunction {

@Override
public void process(String s, 

Re:Re: FLINK 不同 StateBackend ProcessWindowFunction的差别

2020-01-07 Thread USERNAME
感谢 唐老师 解答!

在 2020-01-07 19:46:06,"Yun Tang"  写道:
>Hi
>
>使用iterator.remove() 去除state中已经计算过的数据不是一个标准做法,标准的做法应该是 clear掉相应的state [1]
>至于为什么使用MemoryStateBackend会去除数据是因为 get 返回的结果是backend中on heap直接存储的对象[2],存在修改的副作用。
>而RocksDB state backend get返回的结果是反序列化的list,而不是RocksDB自身存储的数据 [3],也就不存在修改的副作用了。
>
>
>[1] 
>https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377
>[2] 
>https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapAppendingState.java#L57
>[3] 
>https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java#L119
>
>祝好
>唐云
>
>
>From: USERNAME 
>Sent: Tuesday, January 7, 2020 17:54
>To: user-zh@flink.apache.org 
>Subject: FLINK 不同 StateBackend ProcessWindowFunction的差别
>
>各位好!
>祝大家新年快乐!
>
>
>
>
>--版本
>FLINK 1.9.1 ON YARN
>
>
>--过程
>1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
>2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
>3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
>--问题
>new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
>使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
>使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
>这种计算场景有更好的计算方法吗?
>
>
>--部分代码
>final StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
>new ProcessWindowFunction{
>public void process(Tuple tuple, Context context, Iterable 
>elements, Collector out) throws Exception {
>for (Iterator iter = elements.iterator(); iter.hasNext(); ) {
>
>iter.remove();
>}
>}
>
>}
>
>
>
>
>
>
>


Re:Re:Re: FLINK 不同 StateBackend ProcessWindowFunction的差别

2020-01-07 Thread USERNAME
TTL 好像不支持 TimeCharacteristic.EventTime 方式



在 2020-01-08 14:17:11,"USERNAME"  写道:
>我这例子需要通过 在触发器中 TriggerResult.FIRE_AND_PURGE 来清理当前计算窗口的数据,实现增量计算,跟TTL有点区别吧。
>
>
>
>
>
>在 2020-01-07 19:51:57,"huoguo"  写道:
>>
>>
>>过期数据能通过TTL 设置过期吗?
>>
>>> 在 2020年1月7日,17:54,USERNAME  写道:
>>> 
>>> 各位好!
>>> 祝大家新年快乐!
>>> 
>>> 
>>> 
>>> 
>>> --版本
>>> FLINK 1.9.1 ON YARN
>>> 
>>> 
>>> --过程
>>> 1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
>>> 2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
>>> 3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
>>> --问题
>>> new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
>>> 使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
>>> 使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
>>> 这种计算场景有更好的计算方法吗?
>>> 
>>> 
>>> --部分代码
>>> final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> 
>>> 
>>> new ProcessWindowFunction{
>>> public void process(Tuple tuple, Context context, Iterable 
>>> elements, Collector out) throws Exception {
>>> for (Iterator iter = elements.iterator(); iter.hasNext(); ) {
>>> 
>>> iter.remove();
>>> }
>>> }
>>> 
>>> }
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>


Re: FLINK 不同 StateBackend ProcessWindowFunction的差别

2020-01-07 Thread huoguo



过期数据能通过TTL 设置过期吗?

> 在 2020年1月7日,17:54,USERNAME  写道:
> 
> 各位好!
> 祝大家新年快乐!
> 
> 
> 
> 
> --版本
> FLINK 1.9.1 ON YARN
> 
> 
> --过程
> 1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
> 2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
> 3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
> --问题
> new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
> 使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
> 使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
> 这种计算场景有更好的计算方法吗?
> 
> 
> --部分代码
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> 
> 
> new ProcessWindowFunction{
> public void process(Tuple tuple, Context context, Iterable 
> elements, Collector out) throws Exception {
> for (Iterator iter = elements.iterator(); iter.hasNext(); ) {
> 
> iter.remove();
> }
> }
> 
> }
> 
> 
> 
> 
> 
> 
> 




Re: FLINK 不同 StateBackend ProcessWindowFunction的差别

2020-01-07 Thread Yun Tang
Hi

使用iterator.remove() 去除state中已经计算过的数据不是一个标准做法,标准的做法应该是 clear掉相应的state [1]
至于为什么使用MemoryStateBackend会去除数据是因为 get 返回的结果是backend中on heap直接存储的对象[2],存在修改的副作用。
而RocksDB state backend get返回的结果是反序列化的list,而不是RocksDB自身存储的数据 [3],也就不存在修改的副作用了。


[1] 
https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377
[2] 
https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapAppendingState.java#L57
[3] 
https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java#L119

祝好
唐云


From: USERNAME 
Sent: Tuesday, January 7, 2020 17:54
To: user-zh@flink.apache.org 
Subject: FLINK 不同 StateBackend ProcessWindowFunction的差别

各位好!
祝大家新年快乐!




--版本
FLINK 1.9.1 ON YARN


--过程
1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
--问题
new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
这种计算场景有更好的计算方法吗?


--部分代码
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


new ProcessWindowFunction{
public void process(Tuple tuple, Context context, Iterable 
elements, Collector out) throws Exception {
for (Iterator iter = elements.iterator(); iter.hasNext(); ) {

iter.remove();
}
}

}









FLINK 不同 StateBackend ProcessWindowFunction的差别

2020-01-07 Thread USERNAME
各位好!
祝大家新年快乐!




--版本
FLINK 1.9.1 ON YARN


--过程
1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
--问题
new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
这种计算场景有更好的计算方法吗?


--部分代码
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


new ProcessWindowFunction{
public void process(Tuple tuple, Context context, Iterable 
elements, Collector out) throws Exception {
for (Iterator iter = elements.iterator(); iter.hasNext(); ) {

iter.remove();
}
}

}









Re: Firing timers on ProcessWindowFunction

2019-12-02 Thread Avi Levi
Thanks Alexander,
Will do.

Cheers

On Mon, Dec 2, 2019 at 3:23 PM Alexander Fedulov 
wrote:

> *This Message originated outside your organization.*
> --
> Hi Avi,
>
> In this situation I would propose to step back and use a lower level API
> -  ProcessFunction. You can put your window elements into the Flink-managed
> List state and handle clean-up/triggering and periodic state mutations
> exactly as needed by implementing some additional timers logic.
>
> Best regards,
>
> --
>
> Alexander Fedulov | Solutions Architect
>
> +49 1514 6265796
>
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward
> <https://flink-forward.org/>
> - The Apache Flink Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Tony) Cheng
>
>
>
> On Mon, Dec 2, 2019 at 1:16 PM Avi Levi  wrote:
>
>> I think the only way to do this is to add keyed operator down the stream
>> that will hold the global state. not ideal but I don't see any other option
>>
>> On Mon, Dec 2, 2019 at 1:43 PM Avi Levi  wrote:
>>
>>> Hi Vino,
>>> I have a global state that I need to mutate every X hours (e.g clean
>>> that state or update its value) . I thought that there might be an option
>>> to set a timer user the timerService with it's own time interval detached
>>> from the window interval interval .
>>>
>>> On Mon, Dec 2, 2019 at 10:59 AM vino yang  wrote:
>>>
>>>> *This Message originated outside your organization.*
>>>> --
>>>> Hi Avi,
>>>>
>>>> Firstly, let's clarify that the "timer" you said is the timer of the
>>>> window? Or a timer you want to register to trigger some action?
>>>>
>>>> Best,
>>>> Vino
>>>>
>>>>
>>>> Avi Levi  于2019年12月2日周一 下午4:11写道:
>>>>
>>>>> Hi,
>>>>> Is there a way to fire timer in a ProcessWindowFunction ? I would like
>>>>> to mutate the global state on a timely basis.
>>>>>
>>>>>


Re: Firing timers on ProcessWindowFunction

2019-12-02 Thread Avi Levi
I think the only way to do this is to add keyed operator down the stream
that will hold the global state. not ideal but I don't see any other option

On Mon, Dec 2, 2019 at 1:43 PM Avi Levi  wrote:

> Hi Vino,
> I have a global state that I need to mutate every X hours (e.g clean that
> state or update its value) . I thought that there might be an option to set
> a timer user the timerService with it's own time interval detached from the
> window interval interval .
>
> On Mon, Dec 2, 2019 at 10:59 AM vino yang  wrote:
>
>> *This Message originated outside your organization.*
>> --
>> Hi Avi,
>>
>> Firstly, let's clarify that the "timer" you said is the timer of the
>> window? Or a timer you want to register to trigger some action?
>>
>> Best,
>> Vino
>>
>>
>> Avi Levi  于2019年12月2日周一 下午4:11写道:
>>
>>> Hi,
>>> Is there a way to fire timer in a ProcessWindowFunction ? I would like
>>> to mutate the global state on a timely basis.
>>>
>>>


Re: Firing timers on ProcessWindowFunction

2019-12-02 Thread Avi Levi
Hi Vino,
I have a global state that I need to mutate every X hours (e.g clean that
state or update its value) . I thought that there might be an option to set
a timer user the timerService with it's own time interval detached from the
window interval interval .

On Mon, Dec 2, 2019 at 10:59 AM vino yang  wrote:

> *This Message originated outside your organization.*
> --
> Hi Avi,
>
> Firstly, let's clarify that the "timer" you said is the timer of the
> window? Or a timer you want to register to trigger some action?
>
> Best,
> Vino
>
>
> Avi Levi  于2019年12月2日周一 下午4:11写道:
>
>> Hi,
>> Is there a way to fire timer in a ProcessWindowFunction ? I would like to
>> mutate the global state on a timely basis.
>>
>>


Re: Firing timers on ProcessWindowFunction

2019-12-02 Thread vino yang
Hi Avi,

Firstly, let's clarify that the "timer" you said is the timer of the
window? Or a timer you want to register to trigger some action?

Best,
Vino


Avi Levi  于2019年12月2日周一 下午4:11写道:

> Hi,
> Is there a way to fire timer in a ProcessWindowFunction ? I would like to
> mutate the global state on a timely basis.
>
>


Firing timers on ProcessWindowFunction

2019-12-02 Thread Avi Levi
Hi,
Is there a way to fire timer in a ProcessWindowFunction ? I would like to
mutate the global state on a timely basis.


Re: Testing AggregateFunction() and ProcessWindowFunction() on KeyedDataStream

2019-10-28 Thread vino yang
Hi Michael,

>From the WindowTranslationTest, I did not see anything about the
initialization of mini-cluster. Here we are testing operator, it seems
operator test harness has provided the necessary infrastructure.

You can try to see if there is anything missed.

Best,
Vino

Nguyen, Michael  于2019年10月28日周一 下午4:51写道:

> Hi Vino,
>
>
>
> This is a great example – thank you!
>
>
>
> It looks like I need to instantiate a StreamExecutionEnvironment to order
> to get my OneInputStreamOperator. Would I need to setup a local
> flinkCluster using MiniClusterWithClientResource in order to use
> StreamExecutionEnvironment?
>
>
>
>
>
> Best,
>
> Michael
>
>
>
>
>
> *From: *vino yang 
> *Date: *Monday, October 28, 2019 at 1:32 AM
> *To: *Michael Nguyen 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Testing AggregateFunction() and ProcessWindowFunction() on
> KeyedDataStream
>
>
>
> *[External]*
>
>
>
> Hi Michael,
>
>
>
> You may need to know `KeyedOneInputStreamOperatorTestHarness` test class.
>
>
>
> You can consider
> `WindowTranslationTest#testAggregateWithWindowFunctionEventTime` or
> `WindowTranslationTest#testAggregateWithWindowFunctionProcessingTime`[1](both
> of them call `processElementAndEnsureOutput`) as a example.
>
>
>
> [1]:
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java#L676
> <https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Fmaster%2Fflink-streaming-java%2Fsrc%2Ftest%2Fjava%2Forg%2Fapache%2Fflink%2Fstreaming%2Fruntime%2Foperators%2Fwindowing%2FWindowTranslationTest.java%23L676=02%7C01%7CMichael.Nguyen79%40t-mobile.com%7C5a3f25ed6f29450bc95508d75b81627c%7Cbe0f980bdd994b19bd7bbc71a09b026c%7C0%7C0%7C637078483575275538=AfjOTQGV8OFR9azcGzxpwXUrCRptRiYwVAVk7EYlNBY%3D=0>
>
>
>
> Best,
>
> Vino
>
>
>
> Nguyen, Michael  于2019年10月28日周一 下午3:18写道:
>
> Hello everbody,
>
>
>
> Has anyone tried testing AggregateFunction() and ProcessWindowFunction()
> on a KeyedDataStream? I have reviewed the testing page on Flink’s official
> website (
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
> <https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fdev%2Fstream%2Ftesting.html=02%7C01%7CMichael.Nguyen79%40t-mobile.com%7C5a3f25ed6f29450bc95508d75b81627c%7Cbe0f980bdd994b19bd7bbc71a09b026c%7C0%7C0%7C637078483575275538=aeedhITUuaNaBpt92scqzi0EcAqyyuVGVHKCM7euJtc%3D=0>)
> and I am not quite sure how I could utilize these two functions in an
> .aggregate() operator for my testing.
>
>
>
> Here’s how I am using the AggregateFunction (EventCountAggregate()) and
> ProcessWindowFunction (CalculateWindowTotal()) in my Flink job:
>
> DataStream> ec2EventsAggregate =
> ec2Events
> .keyBy(t -> t.f0)
> .timeWindow(Time.*minutes*(30))
> .aggregate(new EventCountAggregate(), new
> CalculateWindowTotal())
> .name("EC2 creation interval count");
>
>
>
>
>
> EventCountAggregate() is counting the each element in ec2Events datastream.
>
>
>
> CalculateWindowTotal() takes the timestamp of each 30 minute window and
> correlates it to the number of elements that has been counted so far for
> the window which returns a Tuple2 containg the end timestamp and the count
> of elements.
>
>
>
>
>
> Thanks,
>
> Michael
>
>


Re: Testing AggregateFunction() and ProcessWindowFunction() on KeyedDataStream

2019-10-28 Thread Nguyen, Michael
Hi Vino,

This is a great example – thank you!

It looks like I need to instantiate a StreamExecutionEnvironment to order to 
get my OneInputStreamOperator. Would I need to setup a local flinkCluster using 
MiniClusterWithClientResource in order to use StreamExecutionEnvironment?


Best,
Michael


From: vino yang 
Date: Monday, October 28, 2019 at 1:32 AM
To: Michael Nguyen 
Cc: "user@flink.apache.org" 
Subject: Re: Testing AggregateFunction() and ProcessWindowFunction() on 
KeyedDataStream

[External]

Hi Michael,

You may need to know `KeyedOneInputStreamOperatorTestHarness` test class.

You can consider 
`WindowTranslationTest#testAggregateWithWindowFunctionEventTime` or 
`WindowTranslationTest#testAggregateWithWindowFunctionProcessingTime`[1](both 
of them call `processElementAndEnsureOutput`) as a example.

[1]: 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java#L676<https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Fmaster%2Fflink-streaming-java%2Fsrc%2Ftest%2Fjava%2Forg%2Fapache%2Fflink%2Fstreaming%2Fruntime%2Foperators%2Fwindowing%2FWindowTranslationTest.java%23L676=02%7C01%7CMichael.Nguyen79%40t-mobile.com%7C5a3f25ed6f29450bc95508d75b81627c%7Cbe0f980bdd994b19bd7bbc71a09b026c%7C0%7C0%7C637078483575275538=AfjOTQGV8OFR9azcGzxpwXUrCRptRiYwVAVk7EYlNBY%3D=0>

Best,
Vino

Nguyen, Michael 
mailto:michael.nguye...@t-mobile.com>> 
于2019年10月28日周一 下午3:18写道:
Hello everbody,

Has anyone tried testing AggregateFunction() and ProcessWindowFunction() on a 
KeyedDataStream? I have reviewed the testing page on Flink’s official website 
(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html<https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fdev%2Fstream%2Ftesting.html=02%7C01%7CMichael.Nguyen79%40t-mobile.com%7C5a3f25ed6f29450bc95508d75b81627c%7Cbe0f980bdd994b19bd7bbc71a09b026c%7C0%7C0%7C637078483575275538=aeedhITUuaNaBpt92scqzi0EcAqyyuVGVHKCM7euJtc%3D=0>)
 and I am not quite sure how I could utilize these two functions in an 
.aggregate() operator for my testing.

Here’s how I am using the AggregateFunction (EventCountAggregate()) and 
ProcessWindowFunction (CalculateWindowTotal()) in my Flink job:
DataStream> ec2EventsAggregate =
ec2Events
.keyBy(t -> t.f0)
.timeWindow(Time.minutes(30))
.aggregate(new EventCountAggregate(), new 
CalculateWindowTotal())
.name("EC2 creation interval count");


EventCountAggregate() is counting the each element in ec2Events datastream.

CalculateWindowTotal() takes the timestamp of each 30 minute window and 
correlates it to the number of elements that has been counted so far for the 
window which returns a Tuple2 containg the end timestamp and the count of 
elements.


Thanks,
Michael


Re: Testing AggregateFunction() and ProcessWindowFunction() on KeyedDataStream

2019-10-28 Thread vino yang
Hi Michael,

You may need to know `KeyedOneInputStreamOperatorTestHarness` test class.

You can consider
`WindowTranslationTest#testAggregateWithWindowFunctionEventTime` or
`WindowTranslationTest#testAggregateWithWindowFunctionProcessingTime`[1](both
of them call `processElementAndEnsureOutput`) as a example.

[1]:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java#L676

Best,
Vino

Nguyen, Michael  于2019年10月28日周一 下午3:18写道:

> Hello everbody,
>
>
>
> Has anyone tried testing AggregateFunction() and ProcessWindowFunction()
> on a KeyedDataStream? I have reviewed the testing page on Flink’s official
> website (
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html)
> and I am not quite sure how I could utilize these two functions in an
> .aggregate() operator for my testing.
>
>
>
> Here’s how I am using the AggregateFunction (EventCountAggregate()) and
> ProcessWindowFunction (CalculateWindowTotal()) in my Flink job:
>
> DataStream> ec2EventsAggregate =
> ec2Events
> .keyBy(t -> t.f0)
> .timeWindow(Time.*minutes*(30))
> .aggregate(new EventCountAggregate(), new
> CalculateWindowTotal())
> .name("EC2 creation interval count");
>
>
>
>
>
> EventCountAggregate() is counting the each element in ec2Events datastream.
>
>
>
> CalculateWindowTotal() takes the timestamp of each 30 minute window and
> correlates it to the number of elements that has been counted so far for
> the window which returns a Tuple2 containg the end timestamp and the count
> of elements.
>
>
>
>
>
> Thanks,
>
> Michael
>


Testing AggregateFunction() and ProcessWindowFunction() on KeyedDataStream

2019-10-28 Thread Nguyen, Michael
Hello everbody,

Has anyone tried testing AggregateFunction() and ProcessWindowFunction() on a 
KeyedDataStream? I have reviewed the testing page on Flink’s official website 
(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html)
 and I am not quite sure how I could utilize these two functions in an 
.aggregate() operator for my testing.

Here’s how I am using the AggregateFunction (EventCountAggregate()) and 
ProcessWindowFunction (CalculateWindowTotal()) in my Flink job:
DataStream> ec2EventsAggregate =
ec2Events
.keyBy(t -> t.f0)
.timeWindow(Time.minutes(30))
.aggregate(new EventCountAggregate(), new 
CalculateWindowTotal())
.name("EC2 creation interval count");


EventCountAggregate() is counting the each element in ec2Events datastream.

CalculateWindowTotal() takes the timestamp of each 30 minute window and 
correlates it to the number of elements that has been counted so far for the 
window which returns a Tuple2 containg the end timestamp and the count of 
elements.


Thanks,
Michael


Re: Increasing trend for state size of keyed stream using ProcessWindowFunction with ProcessingTimeSessionWindows

2019-10-04 Thread Oliwer Kostera
Hi,

I actually created Jira issue before posting it to mailing list. Today I added 
steps to reproduce with tests outcome of different scenarios to the repository.

Jira issue: https://issues.apache.org/jira/browse/FLINK-14197

Repository: https://github.com/loliver1234/flink-process-window-function

With best regards

Oliwer

On 02.10.2019 12:05, Fabian Hueske wrote:
Hi Oliwer,

I think you are right. There seems to be something going wrong.
Just to clarify, you are sure that the growing state size is caused by the 
window operator?

From your description I assume that the state size does not depend (solely) on 
the number of distinct keys.
Otherwise, the state size would stop growing at some point.
This would be a hint that every window leaves some state behind.

AFAIK, processing time session windows are not very common. There might be a 
bug in the implementation.

Could you create a Jira with a description of the problem?
It would be great, if you could provide a reproducible example with a data 
generator source.

Thank you,
Fabian

Am Di., 1. Okt. 2019 um 11:18 Uhr schrieb Oliwer Kostera 
mailto:o.kost...@adbglobal.com>>:

Hi,

I'm no sure what you mean by windowState.clear(). As far as I understand you 
correctly it's a windowState from ProcessWindowFunction Context which is 
KeyedStateStore. KeyedStateStore is managing registered keyed states that I 
don't have, so without a descriptor I can't access any clear() method. There is 
no state that I manage explicitly as you can see here: 
https://github.com/loliver1234/flink-process-window-function/blob/master/src/main/java/personal/kostera/functions/CustomProcessWindowFunction.java

With best regards

Oliwer

On 01.10.2019 07:48, Congxian Qiu wrote:
Hi Oliwer,

From the description, Seems the state didn't be cleared, maybe you could check 
how many {{windowState.clear()}} was triggered in 
{{WindowOperator#processElement}}, and try to figure it out why the state did 
not be cleared.

Best,
Congxian


Oliwer Kostera mailto:o.kost...@adbglobal.com>> 
于2019年9月27日周五 下午4:14写道:

Hi all,


I'm using ProcessWindowFunction in a keyed stream with the following definition:

final SingleOutputStreamOperator processWindowFunctionStream =
 
keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100)))
.process(new CustomProcessWindowFunction())
.uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID)
.name("Process window function");


My checkpointing configuration is set to use RocksDB state backend with 
incremental checkpointing and EXACTLY_ONCE mode.

In a runtime I noticed that even though data ingestion is static - same keys 
and frequency of messages the size of the process window operator keeps 
increasing. I tried to reproduce it with minimal similar setup here: 
https://github.com/loliver1234/flink-process-window-function and was successful 
to do so.

Testing conditions:

  *   RabbitMQ source with Exactly-once guarantee and 65k prefetch count
  *   RabbitMQ sink to collect messages
  *   Simple ProcessWindowFunction that only pass messages through
  *   Stream time characteristic set to TimeCharacteristic.ProcessingTime

Testing scenario:

  *   Start flink job and check initial state size - State Size: 127 KB
  *   Start sending messages, 1000 same unique keys every 1s (they are not 
falling into defined time window gap set to 100ms, each message should create 
new window)
  *   State of the process window operator keeps increasing - after 1mln 
messages state ended up to be around 2mb
  *   Stop sending messages and wait till rabbit queue is fully consumed and 
few checkpoints go by
  *   Was expected to see state size to decrease to base value but it stayed at 
2mb
  *   Continue to send messages with the same keys and state kept increasing 
trend.

What I checked:

  *   Registration and deregistration of timers set for time windows - each 
registration matched its deregistration
  *   Checked that in fact there are no window merges
  *   Tried custom Trigger disabling window merges and setting onProcessingTime 
trigger to TriggerResult.FIRE_AND_PURGE - same state behavior

Tested with:

  *   Local Flink Mini Cluster running from IDE
  *   Flink ha standalone cluster  run in docker

On staging environment, we noticed that state for that operator keeps 
increasing indefinitely, after some months reaching even 1,5gb for 100k unique 
keys

With best regards

Oliwer

[X]
adbglobal.com<https://www.adbglobal.com>
This message (including any attachments) may contain confidential, proprietary, 
privileged and/or private information. The information is intended for the use 
of the individual or entity designated above. If you are not the intended 
recipient of this message, please notify the sender immediately, and delete the 
message and any attachments. Any disclosure, reproduction, distribution or 
other use of this message or any attachments by an individual or ent

Re: Increasing trend for state size of keyed stream using ProcessWindowFunction with ProcessingTimeSessionWindows

2019-10-02 Thread Fabian Hueske
Hi Oliwer,

I think you are right. There seems to be something going wrong.
Just to clarify, you are sure that the growing state size is caused by the
window operator?

>From your description I assume that the state size does not depend (solely)
on the number of distinct keys.
Otherwise, the state size would stop growing at some point.
This would be a hint that every window leaves some state behind.

AFAIK, processing time session windows are not very common. There might be
a bug in the implementation.

Could you create a Jira with a description of the problem?
It would be great, if you could provide a reproducible example with a data
generator source.

Thank you,
Fabian

Am Di., 1. Okt. 2019 um 11:18 Uhr schrieb Oliwer Kostera <
o.kost...@adbglobal.com>:

> Hi,
>
> I'm no sure what you mean by windowState.clear(). As far as I understand
> you correctly it's a windowState from ProcessWindowFunction Context which
> is KeyedStateStore. KeyedStateStore is managing registered keyed states
> that I don't have, so without a descriptor I can't access any clear()
> method. There is no state that I manage explicitly as you can see here:
> https://github.com/loliver1234/flink-process-window-function/blob/master/src/main/java/personal/kostera/functions/CustomProcessWindowFunction.java
>
> With best regards
>
> Oliwer
> On 01.10.2019 07:48, Congxian Qiu wrote:
>
> Hi Oliwer,
>
> From the description, Seems the state didn't be cleared, maybe you could
> check how many {{windowState.clear()}} was triggered in
> {{WindowOperator#processElement}}, and try to figure it out why the state
> did not be cleared.
>
> Best,
> Congxian
>
>
> Oliwer Kostera  于2019年9月27日周五 下午4:14写道:
>
>> Hi all,
>>
>>
>> I'm using *ProcessWindowFunction* in a keyed stream with the following
>> definition:
>>
>> final SingleOutputStreamOperator processWindowFunctionStream =
>>  
>> keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100)))
>> .process(new CustomProcessWindowFunction())
>> .uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID)
>> .name("Process window function");
>>
>> My checkpointing configuration is set to use RocksDB state backend with
>> incremental checkpointing and EXACTLY_ONCE mode.
>>
>> In a runtime I noticed that even though data ingestion is static - same
>> keys and frequency of messages the size of the process window operator
>> keeps increasing. I tried to reproduce it with minimal similar setup here:
>>  https://github.com/loliver1234/flink-process-window-function and was
>> successful to do so.
>>
>> Testing conditions:
>>
>>- RabbitMQ source with Exactly-once guarantee and 65k prefetch count
>>- RabbitMQ sink to collect messages
>>- Simple ProcessWindowFunction that only pass messages through
>>- Stream time characteristic set to TimeCharacteristic.ProcessingTime
>>
>> Testing scenario:
>>
>>- Start flink job and check initial state size - State Size: 127 KB
>>- Start sending messages, 1000 same unique keys every 1s (they are
>>not falling into defined time window gap set to 100ms, each message should
>>create new window)
>>- State of the process window operator keeps increasing - after 1mln
>>messages state ended up to be around 2mb
>>- Stop sending messages and wait till rabbit queue is fully consumed
>>and few checkpoints go by
>>- Was expected to see state size to decrease to base value but it
>>stayed at 2mb
>>- Continue to send messages with the same keys and state kept
>>increasing trend.
>>
>> What I checked:
>>
>>- Registration and deregistration of timers set for time windows -
>>each registration matched its deregistration
>>- Checked that in fact there are no window merges
>>- Tried custom Trigger disabling window merges and setting
>>onProcessingTime trigger to TriggerResult.FIRE_AND_PURGE - same state
>>behavior
>>
>> Tested with:
>>
>>- Local Flink Mini Cluster running from IDE
>>- Flink ha standalone cluster  run in docker
>>
>> On staging environment, we noticed that state for that operator keeps
>> increasing indefinitely, after some months reaching even 1,5gb for 100k
>> unique keys
>>
>> With best regards
>>
>> Oliwer
>> adbglobal.com <https://www.adbglobal.com>
>>
>> *This message (including any attachments) may contain confidential,
>> proprietary, privileged and/or private information. The information is
>> intended for t

Re: Increasing trend for state size of keyed stream using ProcessWindowFunction with ProcessingTimeSessionWindows

2019-10-01 Thread Oliwer Kostera
Hi,

I'm no sure what you mean by windowState.clear(). As far as I understand you 
correctly it's a windowState from ProcessWindowFunction Context which is 
KeyedStateStore. KeyedStateStore is managing registered keyed states that I 
don't have, so without a descriptor I can't access any clear() method. There is 
no state that I manage explicitly as you can see here: 
https://github.com/loliver1234/flink-process-window-function/blob/master/src/main/java/personal/kostera/functions/CustomProcessWindowFunction.java

With best regards

Oliwer

On 01.10.2019 07:48, Congxian Qiu wrote:
Hi Oliwer,

From the description, Seems the state didn't be cleared, maybe you could check 
how many {{windowState.clear()}} was triggered in 
{{WindowOperator#processElement}}, and try to figure it out why the state did 
not be cleared.

Best,
Congxian


Oliwer Kostera mailto:o.kost...@adbglobal.com>> 
于2019年9月27日周五 下午4:14写道:

Hi all,


I'm using ProcessWindowFunction in a keyed stream with the following definition:

final SingleOutputStreamOperator processWindowFunctionStream =
 
keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100)))
.process(new CustomProcessWindowFunction())
.uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID)
.name("Process window function");


My checkpointing configuration is set to use RocksDB state backend with 
incremental checkpointing and EXACTLY_ONCE mode.

In a runtime I noticed that even though data ingestion is static - same keys 
and frequency of messages the size of the process window operator keeps 
increasing. I tried to reproduce it with minimal similar setup here: 
https://github.com/loliver1234/flink-process-window-function and was successful 
to do so.

Testing conditions:

  *   RabbitMQ source with Exactly-once guarantee and 65k prefetch count
  *   RabbitMQ sink to collect messages
  *   Simple ProcessWindowFunction that only pass messages through
  *   Stream time characteristic set to TimeCharacteristic.ProcessingTime

Testing scenario:

  *   Start flink job and check initial state size - State Size: 127 KB
  *   Start sending messages, 1000 same unique keys every 1s (they are not 
falling into defined time window gap set to 100ms, each message should create 
new window)
  *   State of the process window operator keeps increasing - after 1mln 
messages state ended up to be around 2mb
  *   Stop sending messages and wait till rabbit queue is fully consumed and 
few checkpoints go by
  *   Was expected to see state size to decrease to base value but it stayed at 
2mb
  *   Continue to send messages with the same keys and state kept increasing 
trend.

What I checked:

  *   Registration and deregistration of timers set for time windows - each 
registration matched its deregistration
  *   Checked that in fact there are no window merges
  *   Tried custom Trigger disabling window merges and setting onProcessingTime 
trigger to TriggerResult.FIRE_AND_PURGE - same state behavior

Tested with:

  *   Local Flink Mini Cluster running from IDE
  *   Flink ha standalone cluster  run in docker

On staging environment, we noticed that state for that operator keeps 
increasing indefinitely, after some months reaching even 1,5gb for 100k unique 
keys

With best regards

Oliwer

[https://www.adbglobal.com/wp-content/uploads/adb.png]
adbglobal.com<https://www.adbglobal.com>
This message (including any attachments) may contain confidential, proprietary, 
privileged and/or private information. The information is intended for the use 
of the individual or entity designated above. If you are not the intended 
recipient of this message, please notify the sender immediately, and delete the 
message and any attachments. Any disclosure, reproduction, distribution or 
other use of this message or any attachments by an individual or entity other 
than the intended recipient is STRICTLY PROHIBITED.
Please note that ADB protects your privacy. Any personal information we collect 
from you is used in accordance with our Privacy 
Policy<https://www.adbglobal.com/privacy-policy/> and in compliance with 
applicable European data protection law (Regulation (EU) 2016/679, General Data 
Protection Regulation) and other statutory provisions.


Re: Increasing trend for state size of keyed stream using ProcessWindowFunction with ProcessingTimeSessionWindows

2019-10-01 Thread Congxian Qiu
Hi Oliwer,

>From the description, Seems the state didn't be cleared, maybe you could
check how many {{windowState.clear()}} was triggered in
{{WindowOperator#processElement}}, and try to figure it out why the state
did not be cleared.

Best,
Congxian


Oliwer Kostera  于2019年9月27日周五 下午4:14写道:

> Hi all,
>
>
> I'm using *ProcessWindowFunction* in a keyed stream with the following
> definition:
>
> final SingleOutputStreamOperator processWindowFunctionStream =
>  
> keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100)))
> .process(new CustomProcessWindowFunction())
> .uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID)
> .name("Process window function");
>
> My checkpointing configuration is set to use RocksDB state backend with
> incremental checkpointing and EXACTLY_ONCE mode.
>
> In a runtime I noticed that even though data ingestion is static - same
> keys and frequency of messages the size of the process window operator
> keeps increasing. I tried to reproduce it with minimal similar setup here:
>  https://github.com/loliver1234/flink-process-window-function and was
> successful to do so.
>
> Testing conditions:
>
>- RabbitMQ source with Exactly-once guarantee and 65k prefetch count
>- RabbitMQ sink to collect messages
>- Simple ProcessWindowFunction that only pass messages through
>- Stream time characteristic set to TimeCharacteristic.ProcessingTime
>
> Testing scenario:
>
>- Start flink job and check initial state size - State Size: 127 KB
>- Start sending messages, 1000 same unique keys every 1s (they are not
>falling into defined time window gap set to 100ms, each message should
>create new window)
>- State of the process window operator keeps increasing - after 1mln
>messages state ended up to be around 2mb
>- Stop sending messages and wait till rabbit queue is fully consumed
>and few checkpoints go by
>- Was expected to see state size to decrease to base value but it
>stayed at 2mb
>- Continue to send messages with the same keys and state kept
>increasing trend.
>
> What I checked:
>
>- Registration and deregistration of timers set for time windows -
>each registration matched its deregistration
>- Checked that in fact there are no window merges
>- Tried custom Trigger disabling window merges and setting
>onProcessingTime trigger to TriggerResult.FIRE_AND_PURGE - same state
>behavior
>
> Tested with:
>
>- Local Flink Mini Cluster running from IDE
>- Flink ha standalone cluster  run in docker
>
> On staging environment, we noticed that state for that operator keeps
> increasing indefinitely, after some months reaching even 1,5gb for 100k
> unique keys
>
> With best regards
>
> Oliwer
> adbglobal.com <https://www.adbglobal.com>
>
> *This message (including any attachments) may contain confidential,
> proprietary, privileged and/or private information. The information is
> intended for the use of the individual or entity designated above. If you
> are not the intended recipient of this message, please notify the sender
> immediately, and delete the message and any attachments. Any disclosure,
> reproduction, distribution or other use of this message or any attachments
> by an individual or entity other than the intended recipient is STRICTLY
> PROHIBITED.*
>
> *Please note that ADB protects your privacy. Any personal information we
> collect from you is used in accordance with our Privacy Policy
> <https://www.adbglobal.com/privacy-policy/> and in compliance with
> applicable European data protection law (Regulation (EU) 2016/679, General
> Data Protection Regulation) and other statutory provisions. *
>


Increasing trend for state size of keyed stream using ProcessWindowFunction with ProcessingTimeSessionWindows

2019-09-27 Thread Oliwer Kostera
Hi all,


I'm using ProcessWindowFunction in a keyed stream with the following definition:

final SingleOutputStreamOperator processWindowFunctionStream =
 
keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100)))
.process(new CustomProcessWindowFunction())
.uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID)
.name("Process window function");


My checkpointing configuration is set to use RocksDB state backend with 
incremental checkpointing and EXACTLY_ONCE mode.

In a runtime I noticed that even though data ingestion is static - same keys 
and frequency of messages the size of the process window operator keeps 
increasing. I tried to reproduce it with minimal similar setup here: 
https://github.com/loliver1234/flink-process-window-function and was successful 
to do so.

Testing conditions:

  *   RabbitMQ source with Exactly-once guarantee and 65k prefetch count
  *   RabbitMQ sink to collect messages
  *   Simple ProcessWindowFunction that only pass messages through
  *   Stream time characteristic set to TimeCharacteristic.ProcessingTime

Testing scenario:

  *   Start flink job and check initial state size - State Size: 127 KB
  *   Start sending messages, 1000 same unique keys every 1s (they are not 
falling into defined time window gap set to 100ms, each message should create 
new window)
  *   State of the process window operator keeps increasing - after 1mln 
messages state ended up to be around 2mb
  *   Stop sending messages and wait till rabbit queue is fully consumed and 
few checkpoints go by
  *   Was expected to see state size to decrease to base value but it stayed at 
2mb
  *   Continue to send messages with the same keys and state kept increasing 
trend.

What I checked:

  *   Registration and deregistration of timers set for time windows - each 
registration matched its deregistration
  *   Checked that in fact there are no window merges
  *   Tried custom Trigger disabling window merges and setting onProcessingTime 
trigger to TriggerResult.FIRE_AND_PURGE - same state behavior

Tested with:

  *   Local Flink Mini Cluster running from IDE
  *   Flink ha standalone cluster  run in docker

On staging environment, we noticed that state for that operator keeps 
increasing indefinitely, after some months reaching even 1,5gb for 100k unique 
keys

With best regards

Oliwer

[https://www.adbglobal.com/wp-content/uploads/adb.png]
adbglobal.com<https://www.adbglobal.com>
This message (including any attachments) may contain confidential, proprietary, 
privileged and/or private information. The information is intended for the use 
of the individual or entity designated above. If you are not the intended 
recipient of this message, please notify the sender immediately, and delete the 
message and any attachments. Any disclosure, reproduction, distribution or 
other use of this message or any attachments by an individual or entity other 
than the intended recipient is STRICTLY PROHIBITED.
Please note that ADB protects your privacy. Any personal information we collect 
from you is used in accordance with our Privacy 
Policy<https://www.adbglobal.com/privacy-policy/> and in compliance with 
applicable European data protection law (Regulation (EU) 2016/679, General Data 
Protection Regulation) and other statutory provisions.


Re: Table API and ProcessWindowFunction

2019-07-12 Thread Hequn Cheng
Hi Flavio,

I think the reason that we don't have interfaces like EventTimeObject and
ProcessingTimeObject is we don't want to define time attributes anywhere.
It is considered to define your time attributes in the source. If we add an
interface like EventTimeObject and ProcessingTimeObject in Flink, it may
bring some other problems like should we generate time attributes anywhere
once the object extends EventTimeObject and ProcessingTimeObject. The
object may exist in a source, aggregate or even a sink.

However, I think it's a good idea to add such logic in your own code. For
example, you can define a user-defined source which can just extract time
attributes from EventTimeObject and ProcessingTimeObject, similar to the
examples in the exercises.

Best, Hequn

On Thu, Jul 11, 2019 at 4:36 PM Flavio Pompermaier 
wrote:

> Only one proposal here: many times it happens that when working with
> streaming sources you need to define which field is the processing/row.
> Right now you could define the processing or event time field
> implementingthe DefinedProctimeAttribute or DefinedRowtimeAttribute at
> source. But this is only helpful if you use SQL API..with TableFunctions
> for example you don't have a way to get the proc/row field easily.
> Also in the Flink exercises [1] you use aPojo where you have to implement
> a method getEventTime() to retrieve the row time field.
>
> So, why not declaring 2 general interfaces like EventTimeObject and
> ProcessingTimeObject so I can declare my objects implementing those
> interfaces and I can get the fields I need easily?
>
> Best,
> Flavio
>
> [1]
> https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/datatypes/TaxiRide.java
>
> On Thu, Jul 11, 2019 at 10:01 AM Flavio Pompermaier 
> wrote:
>
>> Thanks Hequn, I'll give it a try!
>>
>> Best, Flavio
>>
>> On Thu, Jul 11, 2019 at 3:38 AM Hequn Cheng  wrote:
>>
>>> Hi,
>>>
>>> > Can you provide a pseudo-code example of how to implement this?
>>> Processing time
>>> If you use a TumblingProcessingTimeWindows.of(Time.seconds(1)), for each
>>> record, you get the timestamp from System.currentTimeMillis(), say t, and
>>> w_start = TimeWindow.getWindowStartWithOffset(t, 0, 1000), and w_end =
>>> w_start + 1000.
>>>
>>> Event time
>>> If you use a TumblingEventTimeWindows.of(Time.seconds(1)), for each
>>> record, get the timestamp from the corresponding timestamp field, say t,
>>> and get w_start and w_end same as above.
>>>
>>> More examples can be found in TimeWindowTest[1].
>>>
>>> Best, Hequn
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
>>>
>>>
>>> On Wed, Jul 10, 2019 at 8:46 PM Flavio Pompermaier 
>>> wrote:
>>>
>>>> The problem with the LATERAL JOIN (via
>>>> a LookupableTableSource+TableFunction because I need to call that function
>>>> using the userId a a parameter)  is that I cannot know the window
>>>> start/end..to me it's not clear how to get that from
>>>> TimeWindow.getWindowStartWithOffset(timestamp, offset, windowSize)...
>>>> Can you provide a pseudo-code example of how to implement this?
>>>>
>>>> On Tue, Jul 9, 2019 at 4:15 AM Hequn Cheng 
>>>> wrote:
>>>>
>>>>> Hi Flavio,
>>>>>
>>>>> Thanks for your information.
>>>>>
>>>>> From your description, it seems that you only use the window to get
>>>>> the start and end time. There are no aggregations happen. If this is the
>>>>> case, you can get the start and end time by yourself(the
>>>>> `TimeWindow.getWindowStartWithOffset()` shows how to get window start
>>>>> according to the timestamp). To be more specific, if you use processing
>>>>> time, you can get your timestamp with System.currentTimeMillis(), and then
>>>>> use it to get the window start and end
>>>>> with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get
>>>>> the timestamp from the rowtime field.
>>>>>
>>>>> With the start and end time, you can then perform LATERAL JOIN to
>>>>> enrich the information. You can add a cache in your table function to 
>>>>> avoid
>>>>> frequent contacting with the REST endpoint.
>>>>>
>>>>> Best, Hequn

Re: Table API and ProcessWindowFunction

2019-07-11 Thread Flavio Pompermaier
Only one proposal here: many times it happens that when working with
streaming sources you need to define which field is the processing/row.
Right now you could define the processing or event time field
implementingthe DefinedProctimeAttribute or DefinedRowtimeAttribute at
source. But this is only helpful if you use SQL API..with TableFunctions
for example you don't have a way to get the proc/row field easily.
Also in the Flink exercises [1] you use aPojo where you have to implement a
method getEventTime() to retrieve the row time field.

So, why not declaring 2 general interfaces like EventTimeObject and
ProcessingTimeObject so I can declare my objects implementing those
interfaces and I can get the fields I need easily?

Best,
Flavio

[1]
https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/datatypes/TaxiRide.java

On Thu, Jul 11, 2019 at 10:01 AM Flavio Pompermaier 
wrote:

> Thanks Hequn, I'll give it a try!
>
> Best, Flavio
>
> On Thu, Jul 11, 2019 at 3:38 AM Hequn Cheng  wrote:
>
>> Hi,
>>
>> > Can you provide a pseudo-code example of how to implement this?
>> Processing time
>> If you use a TumblingProcessingTimeWindows.of(Time.seconds(1)), for each
>> record, you get the timestamp from System.currentTimeMillis(), say t, and
>> w_start = TimeWindow.getWindowStartWithOffset(t, 0, 1000), and w_end =
>> w_start + 1000.
>>
>> Event time
>> If you use a TumblingEventTimeWindows.of(Time.seconds(1)), for each
>> record, get the timestamp from the corresponding timestamp field, say t,
>> and get w_start and w_end same as above.
>>
>> More examples can be found in TimeWindowTest[1].
>>
>> Best, Hequn
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
>>
>>
>> On Wed, Jul 10, 2019 at 8:46 PM Flavio Pompermaier 
>> wrote:
>>
>>> The problem with the LATERAL JOIN (via
>>> a LookupableTableSource+TableFunction because I need to call that function
>>> using the userId a a parameter)  is that I cannot know the window
>>> start/end..to me it's not clear how to get that from
>>> TimeWindow.getWindowStartWithOffset(timestamp, offset, windowSize)...
>>> Can you provide a pseudo-code example of how to implement this?
>>>
>>> On Tue, Jul 9, 2019 at 4:15 AM Hequn Cheng  wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> Thanks for your information.
>>>>
>>>> From your description, it seems that you only use the window to get the
>>>> start and end time. There are no aggregations happen. If this is the case,
>>>> you can get the start and end time by yourself(the
>>>> `TimeWindow.getWindowStartWithOffset()` shows how to get window start
>>>> according to the timestamp). To be more specific, if you use processing
>>>> time, you can get your timestamp with System.currentTimeMillis(), and then
>>>> use it to get the window start and end
>>>> with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get
>>>> the timestamp from the rowtime field.
>>>>
>>>> With the start and end time, you can then perform LATERAL JOIN to
>>>> enrich the information. You can add a cache in your table function to avoid
>>>> frequent contacting with the REST endpoint.
>>>>
>>>> Best, Hequn
>>>>
>>>>
>>>> On Mon, Jul 8, 2019 at 10:46 PM Flavio Pompermaier <
>>>> pomperma...@okkam.it> wrote:
>>>>
>>>>> Hi Hequn, thanks for your answer.
>>>>> What I'm trying to do is to read a stream of events that basically
>>>>> contains a UserId field and, every X minutes (i.e. using a Time Window) 
>>>>> and
>>>>> for each different UserId key, query 3 different REST services to enrich 
>>>>> my
>>>>> POJOs*.
>>>>> For the moment what I do is to use a ProcessWindowFunction after the
>>>>> .keyBy().window() as shown in the  previous mail example to contact those 
>>>>> 3
>>>>> services and enrich my object.
>>>>>
>>>>> However I don't like this solution because I'd like to use Flink to
>>>>> it's full potential so I'd like to enrich my object using LATERAL TABLEs 
>>>>> or
>>>>> ASYNC IO..
>>>>> The main problem I'm facing right now is that  I can't find a way to
>>>>>

Re: Table API and ProcessWindowFunction

2019-07-11 Thread Flavio Pompermaier
Thanks Hequn, I'll give it a try!

Best, Flavio

On Thu, Jul 11, 2019 at 3:38 AM Hequn Cheng  wrote:

> Hi,
>
> > Can you provide a pseudo-code example of how to implement this?
> Processing time
> If you use a TumblingProcessingTimeWindows.of(Time.seconds(1)), for each
> record, you get the timestamp from System.currentTimeMillis(), say t, and
> w_start = TimeWindow.getWindowStartWithOffset(t, 0, 1000), and w_end =
> w_start + 1000.
>
> Event time
> If you use a TumblingEventTimeWindows.of(Time.seconds(1)), for each
> record, get the timestamp from the corresponding timestamp field, say t,
> and get w_start and w_end same as above.
>
> More examples can be found in TimeWindowTest[1].
>
> Best, Hequn
>
> [1]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
>
>
> On Wed, Jul 10, 2019 at 8:46 PM Flavio Pompermaier 
> wrote:
>
>> The problem with the LATERAL JOIN (via
>> a LookupableTableSource+TableFunction because I need to call that function
>> using the userId a a parameter)  is that I cannot know the window
>> start/end..to me it's not clear how to get that from
>> TimeWindow.getWindowStartWithOffset(timestamp, offset, windowSize)...
>> Can you provide a pseudo-code example of how to implement this?
>>
>> On Tue, Jul 9, 2019 at 4:15 AM Hequn Cheng  wrote:
>>
>>> Hi Flavio,
>>>
>>> Thanks for your information.
>>>
>>> From your description, it seems that you only use the window to get the
>>> start and end time. There are no aggregations happen. If this is the case,
>>> you can get the start and end time by yourself(the
>>> `TimeWindow.getWindowStartWithOffset()` shows how to get window start
>>> according to the timestamp). To be more specific, if you use processing
>>> time, you can get your timestamp with System.currentTimeMillis(), and then
>>> use it to get the window start and end
>>> with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get
>>> the timestamp from the rowtime field.
>>>
>>> With the start and end time, you can then perform LATERAL JOIN to enrich
>>> the information. You can add a cache in your table function to avoid
>>> frequent contacting with the REST endpoint.
>>>
>>> Best, Hequn
>>>
>>>
>>> On Mon, Jul 8, 2019 at 10:46 PM Flavio Pompermaier 
>>> wrote:
>>>
>>>> Hi Hequn, thanks for your answer.
>>>> What I'm trying to do is to read a stream of events that basically
>>>> contains a UserId field and, every X minutes (i.e. using a Time Window) and
>>>> for each different UserId key, query 3 different REST services to enrich my
>>>> POJOs*.
>>>> For the moment what I do is to use a ProcessWindowFunction after the
>>>> .keyBy().window() as shown in the  previous mail example to contact those 3
>>>> services and enrich my object.
>>>>
>>>> However I don't like this solution because I'd like to use Flink to
>>>> it's full potential so I'd like to enrich my object using LATERAL TABLEs or
>>>> ASYNC IO..
>>>> The main problem I'm facing right now is that  I can't find a way to
>>>> pass the thumbing window start/end to the LATERAL JOIN table functions
>>>> (because this is a parameter of the REST query).
>>>> Moreover I don't know whether this use case is something that Table API
>>>> aims to solve..
>>>>
>>>> * Of course this could kill the REST endpoint if the number of users is
>>>> very big ..because of this I'd like to keep the external state of source
>>>> tables as an internal Flink state and then do a JOIN on the UserId. The
>>>> problem here is that I need to "materialize" them using Debezium (or
>>>> similar) via Kafka and dynamic tables..is there any example of keeping
>>>> multiple tables synched with Flink state through Debezium (without the need
>>>> of rewriting all the logic for managing UPDATE/INSERT/DELETE)?
>>>>
>>>> On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng 
>>>> wrote:
>>>>
>>>>> Hi Flavio,
>>>>>
>>>>> Nice to hear your ideas on Table API!
>>>>>
>>>>> Could you be more specific about your requirements? A detailed
>>>>> scenario would be quite helpful. For example, do you want to emit multi
>>>>> records through the collector or do you want to use 

Re: Table API and ProcessWindowFunction

2019-07-10 Thread Hequn Cheng
Hi,

> Can you provide a pseudo-code example of how to implement this?
Processing time
If you use a TumblingProcessingTimeWindows.of(Time.seconds(1)), for each
record, you get the timestamp from System.currentTimeMillis(), say t, and
w_start = TimeWindow.getWindowStartWithOffset(t, 0, 1000), and w_end =
w_start + 1000.

Event time
If you use a TumblingEventTimeWindows.of(Time.seconds(1)), for each record,
get the timestamp from the corresponding timestamp field, say t, and get
w_start and w_end same as above.

More examples can be found in TimeWindowTest[1].

Best, Hequn

[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java


On Wed, Jul 10, 2019 at 8:46 PM Flavio Pompermaier 
wrote:

> The problem with the LATERAL JOIN (via
> a LookupableTableSource+TableFunction because I need to call that function
> using the userId a a parameter)  is that I cannot know the window
> start/end..to me it's not clear how to get that from
> TimeWindow.getWindowStartWithOffset(timestamp, offset, windowSize)...
> Can you provide a pseudo-code example of how to implement this?
>
> On Tue, Jul 9, 2019 at 4:15 AM Hequn Cheng  wrote:
>
>> Hi Flavio,
>>
>> Thanks for your information.
>>
>> From your description, it seems that you only use the window to get the
>> start and end time. There are no aggregations happen. If this is the case,
>> you can get the start and end time by yourself(the
>> `TimeWindow.getWindowStartWithOffset()` shows how to get window start
>> according to the timestamp). To be more specific, if you use processing
>> time, you can get your timestamp with System.currentTimeMillis(), and then
>> use it to get the window start and end
>> with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get
>> the timestamp from the rowtime field.
>>
>> With the start and end time, you can then perform LATERAL JOIN to enrich
>> the information. You can add a cache in your table function to avoid
>> frequent contacting with the REST endpoint.
>>
>> Best, Hequn
>>
>>
>> On Mon, Jul 8, 2019 at 10:46 PM Flavio Pompermaier 
>> wrote:
>>
>>> Hi Hequn, thanks for your answer.
>>> What I'm trying to do is to read a stream of events that basically
>>> contains a UserId field and, every X minutes (i.e. using a Time Window) and
>>> for each different UserId key, query 3 different REST services to enrich my
>>> POJOs*.
>>> For the moment what I do is to use a ProcessWindowFunction after the
>>> .keyBy().window() as shown in the  previous mail example to contact those 3
>>> services and enrich my object.
>>>
>>> However I don't like this solution because I'd like to use Flink to it's
>>> full potential so I'd like to enrich my object using LATERAL TABLEs or
>>> ASYNC IO..
>>> The main problem I'm facing right now is that  I can't find a way to
>>> pass the thumbing window start/end to the LATERAL JOIN table functions
>>> (because this is a parameter of the REST query).
>>> Moreover I don't know whether this use case is something that Table API
>>> aims to solve..
>>>
>>> * Of course this could kill the REST endpoint if the number of users is
>>> very big ..because of this I'd like to keep the external state of source
>>> tables as an internal Flink state and then do a JOIN on the UserId. The
>>> problem here is that I need to "materialize" them using Debezium (or
>>> similar) via Kafka and dynamic tables..is there any example of keeping
>>> multiple tables synched with Flink state through Debezium (without the need
>>> of rewriting all the logic for managing UPDATE/INSERT/DELETE)?
>>>
>>> On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng  wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> Nice to hear your ideas on Table API!
>>>>
>>>> Could you be more specific about your requirements? A detailed scenario
>>>> would be quite helpful. For example, do you want to emit multi records
>>>> through the collector or do you want to use the timer?
>>>>
>>>> BTW, Table API introduces flatAggregate recently(both non-window
>>>> flatAggregate and window flatAggregate) and will be included in the near
>>>> coming release-1.9. The flatAggregate can emit multi records for a single
>>>> group. More details here[1][2].
>>>> Hope this can solve your problem.
>>>>
>>>> Best, Hequn
>>>>
>>>> [1]
>>>> ht

Re: Table API and ProcessWindowFunction

2019-07-10 Thread Flavio Pompermaier
The problem with the LATERAL JOIN (via
a LookupableTableSource+TableFunction because I need to call that function
using the userId a a parameter)  is that I cannot know the window
start/end..to me it's not clear how to get that from
TimeWindow.getWindowStartWithOffset(timestamp, offset, windowSize)...
Can you provide a pseudo-code example of how to implement this?

On Tue, Jul 9, 2019 at 4:15 AM Hequn Cheng  wrote:

> Hi Flavio,
>
> Thanks for your information.
>
> From your description, it seems that you only use the window to get the
> start and end time. There are no aggregations happen. If this is the case,
> you can get the start and end time by yourself(the
> `TimeWindow.getWindowStartWithOffset()` shows how to get window start
> according to the timestamp). To be more specific, if you use processing
> time, you can get your timestamp with System.currentTimeMillis(), and then
> use it to get the window start and end
> with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get
> the timestamp from the rowtime field.
>
> With the start and end time, you can then perform LATERAL JOIN to enrich
> the information. You can add a cache in your table function to avoid
> frequent contacting with the REST endpoint.
>
> Best, Hequn
>
>
> On Mon, Jul 8, 2019 at 10:46 PM Flavio Pompermaier 
> wrote:
>
>> Hi Hequn, thanks for your answer.
>> What I'm trying to do is to read a stream of events that basically
>> contains a UserId field and, every X minutes (i.e. using a Time Window) and
>> for each different UserId key, query 3 different REST services to enrich my
>> POJOs*.
>> For the moment what I do is to use a ProcessWindowFunction after the
>> .keyBy().window() as shown in the  previous mail example to contact those 3
>> services and enrich my object.
>>
>> However I don't like this solution because I'd like to use Flink to it's
>> full potential so I'd like to enrich my object using LATERAL TABLEs or
>> ASYNC IO..
>> The main problem I'm facing right now is that  I can't find a way to pass
>> the thumbing window start/end to the LATERAL JOIN table functions (because
>> this is a parameter of the REST query).
>> Moreover I don't know whether this use case is something that Table API
>> aims to solve..
>>
>> * Of course this could kill the REST endpoint if the number of users is
>> very big ..because of this I'd like to keep the external state of source
>> tables as an internal Flink state and then do a JOIN on the UserId. The
>> problem here is that I need to "materialize" them using Debezium (or
>> similar) via Kafka and dynamic tables..is there any example of keeping
>> multiple tables synched with Flink state through Debezium (without the need
>> of rewriting all the logic for managing UPDATE/INSERT/DELETE)?
>>
>> On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng  wrote:
>>
>>> Hi Flavio,
>>>
>>> Nice to hear your ideas on Table API!
>>>
>>> Could you be more specific about your requirements? A detailed scenario
>>> would be quite helpful. For example, do you want to emit multi records
>>> through the collector or do you want to use the timer?
>>>
>>> BTW, Table API introduces flatAggregate recently(both non-window
>>> flatAggregate and window flatAggregate) and will be included in the near
>>> coming release-1.9. The flatAggregate can emit multi records for a single
>>> group. More details here[1][2].
>>> Hope this can solve your problem.
>>>
>>> Best, Hequn
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#row-based-operations
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-aggregation-functions
>>>
>>> On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier 
>>> wrote:
>>>
>>>> Hi to all,
>>>> from what I understood a ProcessWindowFunction can only be used in the
>>>> Streaming API.
>>>> Is there any plan to port them also in the Table API (in the near
>>>> future)?
>>>> I'd like to do with Table API the equivalent of:
>>>>
>>>> final DataStream events = env.addSource(src);
>>>> events.filter(e -> e.getCode() != null)
>>>> .keyBy(event -> Integer.valueOf(event.getCode()))
>>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>>>> .process(new ProcessWindowFunction>>> Integer, TimeWindow>()  {.});
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>
>>
>>

-- 
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809


Re: Table API and ProcessWindowFunction

2019-07-08 Thread Hequn Cheng
Hi Flavio,

Thanks for your information.

>From your description, it seems that you only use the window to get the
start and end time. There are no aggregations happen. If this is the case,
you can get the start and end time by yourself(the
`TimeWindow.getWindowStartWithOffset()` shows how to get window start
according to the timestamp). To be more specific, if you use processing
time, you can get your timestamp with System.currentTimeMillis(), and then
use it to get the window start and end
with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get
the timestamp from the rowtime field.

With the start and end time, you can then perform LATERAL JOIN to enrich
the information. You can add a cache in your table function to avoid
frequent contacting with the REST endpoint.

Best, Hequn


On Mon, Jul 8, 2019 at 10:46 PM Flavio Pompermaier 
wrote:

> Hi Hequn, thanks for your answer.
> What I'm trying to do is to read a stream of events that basically
> contains a UserId field and, every X minutes (i.e. using a Time Window) and
> for each different UserId key, query 3 different REST services to enrich my
> POJOs*.
> For the moment what I do is to use a ProcessWindowFunction after the
> .keyBy().window() as shown in the  previous mail example to contact those 3
> services and enrich my object.
>
> However I don't like this solution because I'd like to use Flink to it's
> full potential so I'd like to enrich my object using LATERAL TABLEs or
> ASYNC IO..
> The main problem I'm facing right now is that  I can't find a way to pass
> the thumbing window start/end to the LATERAL JOIN table functions (because
> this is a parameter of the REST query).
> Moreover I don't know whether this use case is something that Table API
> aims to solve..
>
> * Of course this could kill the REST endpoint if the number of users is
> very big ..because of this I'd like to keep the external state of source
> tables as an internal Flink state and then do a JOIN on the UserId. The
> problem here is that I need to "materialize" them using Debezium (or
> similar) via Kafka and dynamic tables..is there any example of keeping
> multiple tables synched with Flink state through Debezium (without the need
> of rewriting all the logic for managing UPDATE/INSERT/DELETE)?
>
> On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng  wrote:
>
>> Hi Flavio,
>>
>> Nice to hear your ideas on Table API!
>>
>> Could you be more specific about your requirements? A detailed scenario
>> would be quite helpful. For example, do you want to emit multi records
>> through the collector or do you want to use the timer?
>>
>> BTW, Table API introduces flatAggregate recently(both non-window
>> flatAggregate and window flatAggregate) and will be included in the near
>> coming release-1.9. The flatAggregate can emit multi records for a single
>> group. More details here[1][2].
>> Hope this can solve your problem.
>>
>> Best, Hequn
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#row-based-operations
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-aggregation-functions
>>
>> On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier 
>> wrote:
>>
>>> Hi to all,
>>> from what I understood a ProcessWindowFunction can only be used in the
>>> Streaming API.
>>> Is there any plan to port them also in the Table API (in the near
>>> future)?
>>> I'd like to do with Table API the equivalent of:
>>>
>>> final DataStream events = env.addSource(src);
>>> events.filter(e -> e.getCode() != null)
>>> .keyBy(event -> Integer.valueOf(event.getCode()))
>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>>> .process(new ProcessWindowFunction>> Integer, TimeWindow>()  {.});
>>>
>>> Best,
>>> Flavio
>>>
>>
>
>


Re: Table API and ProcessWindowFunction

2019-07-08 Thread Flavio Pompermaier
Hi Hequn, thanks for your answer.
What I'm trying to do is to read a stream of events that basically contains
a UserId field and, every X minutes (i.e. using a Time Window) and for each
different UserId key, query 3 different REST services to enrich my POJOs*.
For the moment what I do is to use a ProcessWindowFunction after the
.keyBy().window() as shown in the  previous mail example to contact those 3
services and enrich my object.

However I don't like this solution because I'd like to use Flink to it's
full potential so I'd like to enrich my object using LATERAL TABLEs or
ASYNC IO..
The main problem I'm facing right now is that  I can't find a way to pass
the thumbing window start/end to the LATERAL JOIN table functions (because
this is a parameter of the REST query).
Moreover I don't know whether this use case is something that Table API
aims to solve..

* Of course this could kill the REST endpoint if the number of users is
very big ..because of this I'd like to keep the external state of source
tables as an internal Flink state and then do a JOIN on the UserId. The
problem here is that I need to "materialize" them using Debezium (or
similar) via Kafka and dynamic tables..is there any example of keeping
multiple tables synched with Flink state through Debezium (without the need
of rewriting all the logic for managing UPDATE/INSERT/DELETE)?

On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng  wrote:

> Hi Flavio,
>
> Nice to hear your ideas on Table API!
>
> Could you be more specific about your requirements? A detailed scenario
> would be quite helpful. For example, do you want to emit multi records
> through the collector or do you want to use the timer?
>
> BTW, Table API introduces flatAggregate recently(both non-window
> flatAggregate and window flatAggregate) and will be included in the near
> coming release-1.9. The flatAggregate can emit multi records for a single
> group. More details here[1][2].
> Hope this can solve your problem.
>
> Best, Hequn
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#row-based-operations
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-aggregation-functions
>
> On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier 
> wrote:
>
>> Hi to all,
>> from what I understood a ProcessWindowFunction can only be used in the
>> Streaming API.
>> Is there any plan to port them also in the Table API (in the near future)?
>> I'd like to do with Table API the equivalent of:
>>
>> final DataStream events = env.addSource(src);
>> events.filter(e -> e.getCode() != null)
>>     .keyBy(event -> Integer.valueOf(event.getCode()))
>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>> .process(new ProcessWindowFunction> TimeWindow>()  {.});
>>
>> Best,
>> Flavio
>>
>


Re: Table API and ProcessWindowFunction

2019-07-08 Thread Hequn Cheng
Hi Flavio,

Nice to hear your ideas on Table API!

Could you be more specific about your requirements? A detailed scenario
would be quite helpful. For example, do you want to emit multi records
through the collector or do you want to use the timer?

BTW, Table API introduces flatAggregate recently(both non-window
flatAggregate and window flatAggregate) and will be included in the near
coming release-1.9. The flatAggregate can emit multi records for a single
group. More details here[1][2].
Hope this can solve your problem.

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#row-based-operations
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-aggregation-functions

On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier 
wrote:

> Hi to all,
> from what I understood a ProcessWindowFunction can only be used in the
> Streaming API.
> Is there any plan to port them also in the Table API (in the near future)?
> I'd like to do with Table API the equivalent of:
>
> final DataStream events = env.addSource(src);
> events.filter(e -> e.getCode() != null)
> .keyBy(event -> Integer.valueOf(event.getCode()))
> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
> .process(new ProcessWindowFunction TimeWindow>()  {.});
>
> Best,
> Flavio
>


Table API and ProcessWindowFunction

2019-07-08 Thread Flavio Pompermaier
Hi to all,
from what I understood a ProcessWindowFunction can only be used in the
Streaming API.
Is there any plan to port them also in the Table API (in the near future)?
I'd like to do with Table API the equivalent of:

final DataStream events = env.addSource(src);
events.filter(e -> e.getCode() != null)
.keyBy(event -> Integer.valueOf(event.getCode()))
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.process(new ProcessWindowFunction()  {.});

Best,
Flavio


Re: processWindowFunction

2018-08-20 Thread antonio saldivar
Maybe the usage of that function change, now I have to use it as this [1]


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction

El lun., 20 ago. 2018 a las 5:56, vino yang ()
escribió:

> Hi antonio,
>
> Oh, if you can't use KeyedProcessFunction, then this would be a pity.
> Then you can use MapState, where Key is used to store the key of your
> partition.
> But I am not sure if this will achieve the effect you want.
>
> Thanks, vino.
>
> antonio saldivar  于2018年8月20日周一 下午4:32写道:
>
>> Hello
>>
>> Thank you for the information, for some reason this KeyedProcessFunction
>> is not found in my Flink version 1.4.2 I can only find ProcessFunction and
>> work like this
>>
>> public class TxnProcessFn extends ProcessFunction {
>>
>>  public void open(Configuration parameters) throws Exception {
>>
>> state1 = getRuntimeContext().getState(new ValueStateDescriptor<>(
>> "objState1", Object.class));
>>
>> state2 = getRuntimeContext().getState(new ValueStateDescriptor<>(
>> "objState2", Object.class));
>>
>> state3 = getRuntimeContext().getState(new ValueStateDescriptor<>(
>> "objState3", Object.class));
>>
>> }
>>
>> @Override
>>
>> public void processElement(
>>
>> Object obj,
>>
>> Context ctx,
>>
>> Collector out) throws Exception {
>>
>> // TODO Auto-generated method stub
>>
>> Object current = state.value();
>>
>> if (current == null) {
>>
>> current = new Object();
>>
>> current.id=obj.id();
>>
>>
>>
>> }
>>
>> }
>>
>> El lun., 20 ago. 2018 a las 2:24, vino yang ()
>> escribió:
>>
>>> Hi antonio,
>>>
>>> First, I suggest you use KeyedProcessFunction if you have an operation
>>> similar to keyBy.
>>> The implementation is similar to the Fixed window.
>>> You can create three state collections to determine whether the time of
>>> each element belongs to a state collection.
>>> At the time of the trigger, the elements in the collection are evaluated.
>>>
>>> Thanks, vino.
>>>
>>> antonio saldivar  于2018年8月20日周一 上午11:54写道:
>>>
>>>> Thank you fro the references
>>>>
>>>> I have now my processFunction and getting the state but now how can i
>>>> do for the threshold times to group the elements and also as this is a
>>>> global window, how to purge because if going to keep increasing
>>>>
>>>> El dom., 19 ago. 2018 a las 8:57, vino yang ()
>>>> escribió:
>>>>
>>>>> Hi antonio,
>>>>>
>>>>> Regarding your scenario, I think maybe you can consider using the
>>>>> ProcessFunction (or keyed ProcessFunction) function directly on the 
>>>>> Stream.
>>>>> [1]
>>>>> It can handle each of your elements with a Timer, and you can combine
>>>>> Flink's state API[2] to store your data.
>>>>>
>>>>> [1]:
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#process-function-low-level-operations
>>>>> [2]:
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#working-with-state
>>>>>
>>>>> Thanks, vino.
>>>>>
>>>>> antonio saldivar  于2018年8月19日周日 上午10:18写道:
>>>>>
>>>>>> hi Vino
>>>>>>
>>>>>> it is possible to use global window, then set the trigger onElement
>>>>>> comparing the element that has arrived with for example 10 mins, 20 mins
>>>>>> and 60 mins of data?
>>>>>>
>>>>>> I have rules evaluating sum of amount for 10,20 or 60 mins for the
>>>>>> same keyed element if the same id sum like $200 total within those
>>>>>> thresholds and count more or equals to 3 I need to be able to set some
>>>>>> values to the object if the object does not reach those thresholds i do 
>>>>>> not
>>>>>> set the values and keep sending the output with or without those value.
>>>>>>
>>>>>> just processing the object on the fly and send output
>>>>>>
>>>>>>
>>>>>>
>>>>>>
&g

Re: processWindowFunction

2018-08-20 Thread vino yang
Hi antonio,

Oh, if you can't use KeyedProcessFunction, then this would be a pity.
Then you can use MapState, where Key is used to store the key of your
partition.
But I am not sure if this will achieve the effect you want.

Thanks, vino.

antonio saldivar  于2018年8月20日周一 下午4:32写道:

> Hello
>
> Thank you for the information, for some reason this KeyedProcessFunction
> is not found in my Flink version 1.4.2 I can only find ProcessFunction and
> work like this
>
> public class TxnProcessFn extends ProcessFunction {
>
>  public void open(Configuration parameters) throws Exception {
>
> state1 = getRuntimeContext().getState(new ValueStateDescriptor<>(
> "objState1", Object.class));
>
> state2 = getRuntimeContext().getState(new ValueStateDescriptor<>(
> "objState2", Object.class));
>
> state3 = getRuntimeContext().getState(new ValueStateDescriptor<>(
> "objState3", Object.class));
>
> }
>
> @Override
>
> public void processElement(
>
> Object obj,
>
> Context ctx,
>
> Collector out) throws Exception {
>
> // TODO Auto-generated method stub
>
> Object current = state.value();
>
> if (current == null) {
>
> current = new Object();
>
> current.id=obj.id();
>
>
>
> }
>
> }
>
> El lun., 20 ago. 2018 a las 2:24, vino yang ()
> escribió:
>
>> Hi antonio,
>>
>> First, I suggest you use KeyedProcessFunction if you have an operation
>> similar to keyBy.
>> The implementation is similar to the Fixed window.
>> You can create three state collections to determine whether the time of
>> each element belongs to a state collection.
>> At the time of the trigger, the elements in the collection are evaluated.
>>
>> Thanks, vino.
>>
>> antonio saldivar  于2018年8月20日周一 上午11:54写道:
>>
>>> Thank you fro the references
>>>
>>> I have now my processFunction and getting the state but now how can i do
>>> for the threshold times to group the elements and also as this is a global
>>> window, how to purge because if going to keep increasing
>>>
>>> El dom., 19 ago. 2018 a las 8:57, vino yang ()
>>> escribió:
>>>
>>>> Hi antonio,
>>>>
>>>> Regarding your scenario, I think maybe you can consider using the
>>>> ProcessFunction (or keyed ProcessFunction) function directly on the Stream.
>>>> [1]
>>>> It can handle each of your elements with a Timer, and you can combine
>>>> Flink's state API[2] to store your data.
>>>>
>>>> [1]:
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#process-function-low-level-operations
>>>> [2]:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#working-with-state
>>>>
>>>> Thanks, vino.
>>>>
>>>> antonio saldivar  于2018年8月19日周日 上午10:18写道:
>>>>
>>>>> hi Vino
>>>>>
>>>>> it is possible to use global window, then set the trigger onElement
>>>>> comparing the element that has arrived with for example 10 mins, 20 mins
>>>>> and 60 mins of data?
>>>>>
>>>>> I have rules evaluating sum of amount for 10,20 or 60 mins for the
>>>>> same keyed element if the same id sum like $200 total within those
>>>>> thresholds and count more or equals to 3 I need to be able to set some
>>>>> values to the object if the object does not reach those thresholds i do 
>>>>> not
>>>>> set the values and keep sending the output with or without those value.
>>>>>
>>>>> just processing the object on the fly and send output
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> El vie., 17 ago. 2018 a las 22:14, vino yang ()
>>>>> escribió:
>>>>>
>>>>>> Hi antonio,
>>>>>>
>>>>>> Yes, ProcessWindowFunction is a very low level window function.
>>>>>> It allows you to access the data in the window and allows you to
>>>>>> customize the output of the window.
>>>>>> So if you use it, while giving you flexibility, you need to think
>>>>>> about other things, which may require you to write more processing logic.
>>>>>>
>>>>>> Generally speaking, sliding windows usu

Re: processWindowFunction

2018-08-20 Thread antonio saldivar
Hello

Thank you for the information, for some reason this KeyedProcessFunction is
not found in my Flink version 1.4.2 I can only find ProcessFunction and
work like this

public class TxnProcessFn extends ProcessFunction {

 public void open(Configuration parameters) throws Exception {

state1 = getRuntimeContext().getState(new ValueStateDescriptor<>("objState1",
Object.class));

state2 = getRuntimeContext().getState(new ValueStateDescriptor<>("objState2",
Object.class));

state3 = getRuntimeContext().getState(new ValueStateDescriptor<>("objState3",
Object.class));

}

@Override

public void processElement(

Object obj,

Context ctx,

Collector out) throws Exception {

// TODO Auto-generated method stub

Object current = state.value();

if (current == null) {

current = new Object();

current.id=obj.id();



}

}

El lun., 20 ago. 2018 a las 2:24, vino yang ()
escribió:

> Hi antonio,
>
> First, I suggest you use KeyedProcessFunction if you have an operation
> similar to keyBy.
> The implementation is similar to the Fixed window.
> You can create three state collections to determine whether the time of
> each element belongs to a state collection.
> At the time of the trigger, the elements in the collection are evaluated.
>
> Thanks, vino.
>
> antonio saldivar  于2018年8月20日周一 上午11:54写道:
>
>> Thank you fro the references
>>
>> I have now my processFunction and getting the state but now how can i do
>> for the threshold times to group the elements and also as this is a global
>> window, how to purge because if going to keep increasing
>>
>> El dom., 19 ago. 2018 a las 8:57, vino yang ()
>> escribió:
>>
>>> Hi antonio,
>>>
>>> Regarding your scenario, I think maybe you can consider using the
>>> ProcessFunction (or keyed ProcessFunction) function directly on the Stream.
>>> [1]
>>> It can handle each of your elements with a Timer, and you can combine
>>> Flink's state API[2] to store your data.
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#process-function-low-level-operations
>>> [2]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#working-with-state
>>>
>>> Thanks, vino.
>>>
>>> antonio saldivar  于2018年8月19日周日 上午10:18写道:
>>>
>>>> hi Vino
>>>>
>>>> it is possible to use global window, then set the trigger onElement
>>>> comparing the element that has arrived with for example 10 mins, 20 mins
>>>> and 60 mins of data?
>>>>
>>>> I have rules evaluating sum of amount for 10,20 or 60 mins for the same
>>>> keyed element if the same id sum like $200 total within those thresholds
>>>> and count more or equals to 3 I need to be able to set some values to the
>>>> object if the object does not reach those thresholds i do not set the
>>>> values and keep sending the output with or without those value.
>>>>
>>>> just processing the object on the fly and send output
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> El vie., 17 ago. 2018 a las 22:14, vino yang ()
>>>> escribió:
>>>>
>>>>> Hi antonio,
>>>>>
>>>>> Yes, ProcessWindowFunction is a very low level window function.
>>>>> It allows you to access the data in the window and allows you to
>>>>> customize the output of the window.
>>>>> So if you use it, while giving you flexibility, you need to think
>>>>> about other things, which may require you to write more processing logic.
>>>>>
>>>>> Generally speaking, sliding windows usually have some data that is
>>>>> repeated, but a common mode is to apply a reduce function on it to get 
>>>>> your
>>>>> calculation results.
>>>>> If you only send data, there will definitely be some duplication.
>>>>>
>>>>> Thanks, vino.
>>>>>
>>>>> antonio saldivar  于2018年8月17日周五 下午12:01写道:
>>>>>
>>>>>> Hi Vino
>>>>>> thank you for the information, actually I am using a trigger alert
>>>>>> and processWindowFunction to send my results, but when my window slides 
>>>>>> or
>>>>>> ends it sends again the objects and I an getting duplicated data
>>>>>>
>>>>>

Re: processWindowFunction

2018-08-20 Thread vino yang
Hi antonio,

First, I suggest you use KeyedProcessFunction if you have an operation
similar to keyBy.
The implementation is similar to the Fixed window.
You can create three state collections to determine whether the time of
each element belongs to a state collection.
At the time of the trigger, the elements in the collection are evaluated.

Thanks, vino.

antonio saldivar  于2018年8月20日周一 上午11:54写道:

> Thank you fro the references
>
> I have now my processFunction and getting the state but now how can i do
> for the threshold times to group the elements and also as this is a global
> window, how to purge because if going to keep increasing
>
> El dom., 19 ago. 2018 a las 8:57, vino yang ()
> escribió:
>
>> Hi antonio,
>>
>> Regarding your scenario, I think maybe you can consider using the
>> ProcessFunction (or keyed ProcessFunction) function directly on the Stream.
>> [1]
>> It can handle each of your elements with a Timer, and you can combine
>> Flink's state API[2] to store your data.
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#process-function-low-level-operations
>> [2]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#working-with-state
>>
>> Thanks, vino.
>>
>> antonio saldivar  于2018年8月19日周日 上午10:18写道:
>>
>>> hi Vino
>>>
>>> it is possible to use global window, then set the trigger onElement
>>> comparing the element that has arrived with for example 10 mins, 20 mins
>>> and 60 mins of data?
>>>
>>> I have rules evaluating sum of amount for 10,20 or 60 mins for the same
>>> keyed element if the same id sum like $200 total within those thresholds
>>> and count more or equals to 3 I need to be able to set some values to the
>>> object if the object does not reach those thresholds i do not set the
>>> values and keep sending the output with or without those value.
>>>
>>> just processing the object on the fly and send output
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> El vie., 17 ago. 2018 a las 22:14, vino yang ()
>>> escribió:
>>>
>>>> Hi antonio,
>>>>
>>>> Yes, ProcessWindowFunction is a very low level window function.
>>>> It allows you to access the data in the window and allows you to
>>>> customize the output of the window.
>>>> So if you use it, while giving you flexibility, you need to think about
>>>> other things, which may require you to write more processing logic.
>>>>
>>>> Generally speaking, sliding windows usually have some data that is
>>>> repeated, but a common mode is to apply a reduce function on it to get your
>>>> calculation results.
>>>> If you only send data, there will definitely be some duplication.
>>>>
>>>> Thanks, vino.
>>>>
>>>> antonio saldivar  于2018年8月17日周五 下午12:01写道:
>>>>
>>>>> Hi Vino
>>>>> thank you for the information, actually I am using a trigger alert and
>>>>> processWindowFunction to send my results, but when my window slides or 
>>>>> ends
>>>>> it sends again the objects and I an getting duplicated data
>>>>>
>>>>> El jue., 16 ago. 2018 a las 22:05, vino yang ()
>>>>> escribió:
>>>>>
>>>>>> Hi Antonio,
>>>>>>
>>>>>> What results do not you want to get when creating each window?
>>>>>> Examples of the use of ProcessWindowFunction are included in many
>>>>>> test files in Flink's project, such as SideOutputITCase.scala or
>>>>>> WindowTranslationTest.scala.
>>>>>>
>>>>>> For more information on ProcessWindowFunction, you can refer to the
>>>>>> official website.[1]
>>>>>>
>>>>>> [1]:
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#processwindowfunction
>>>>>>
>>>>>> Thanks, vino.
>>>>>>
>>>>>> antonio saldivar  于2018年8月17日周五 上午6:24写道:
>>>>>>
>>>>>>> Hello
>>>>>>>
>>>>>>> I am implementing a data stream where I use sliding windows but I am
>>>>>>> stuck because I need to set values to my object based on some if 
>>>>>>> statements
>>>>>>> in my process function  and send the object to the next step but I don't
>>>>>>> want results every time a window is creating
>>>>>>>
>>>>>>> if anyone has a good example on this that can help me
>>>>>>>
>>>>>>


Re: processWindowFunction

2018-08-19 Thread antonio saldivar
Thank you fro the references

I have now my processFunction and getting the state but now how can i do
for the threshold times to group the elements and also as this is a global
window, how to purge because if going to keep increasing

El dom., 19 ago. 2018 a las 8:57, vino yang ()
escribió:

> Hi antonio,
>
> Regarding your scenario, I think maybe you can consider using the
> ProcessFunction (or keyed ProcessFunction) function directly on the Stream.
> [1]
> It can handle each of your elements with a Timer, and you can combine
> Flink's state API[2] to store your data.
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#process-function-low-level-operations
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#working-with-state
>
> Thanks, vino.
>
> antonio saldivar  于2018年8月19日周日 上午10:18写道:
>
>> hi Vino
>>
>> it is possible to use global window, then set the trigger onElement
>> comparing the element that has arrived with for example 10 mins, 20 mins
>> and 60 mins of data?
>>
>> I have rules evaluating sum of amount for 10,20 or 60 mins for the same
>> keyed element if the same id sum like $200 total within those thresholds
>> and count more or equals to 3 I need to be able to set some values to the
>> object if the object does not reach those thresholds i do not set the
>> values and keep sending the output with or without those value.
>>
>> just processing the object on the fly and send output
>>
>>
>>
>>
>>
>>
>>
>> El vie., 17 ago. 2018 a las 22:14, vino yang ()
>> escribió:
>>
>>> Hi antonio,
>>>
>>> Yes, ProcessWindowFunction is a very low level window function.
>>> It allows you to access the data in the window and allows you to
>>> customize the output of the window.
>>> So if you use it, while giving you flexibility, you need to think about
>>> other things, which may require you to write more processing logic.
>>>
>>> Generally speaking, sliding windows usually have some data that is
>>> repeated, but a common mode is to apply a reduce function on it to get your
>>> calculation results.
>>> If you only send data, there will definitely be some duplication.
>>>
>>> Thanks, vino.
>>>
>>> antonio saldivar  于2018年8月17日周五 下午12:01写道:
>>>
>>>> Hi Vino
>>>> thank you for the information, actually I am using a trigger alert and
>>>> processWindowFunction to send my results, but when my window slides or ends
>>>> it sends again the objects and I an getting duplicated data
>>>>
>>>> El jue., 16 ago. 2018 a las 22:05, vino yang ()
>>>> escribió:
>>>>
>>>>> Hi Antonio,
>>>>>
>>>>> What results do not you want to get when creating each window?
>>>>> Examples of the use of ProcessWindowFunction are included in many test
>>>>> files in Flink's project, such as SideOutputITCase.scala or
>>>>> WindowTranslationTest.scala.
>>>>>
>>>>> For more information on ProcessWindowFunction, you can refer to the
>>>>> official website.[1]
>>>>>
>>>>> [1]:
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#processwindowfunction
>>>>>
>>>>> Thanks, vino.
>>>>>
>>>>> antonio saldivar  于2018年8月17日周五 上午6:24写道:
>>>>>
>>>>>> Hello
>>>>>>
>>>>>> I am implementing a data stream where I use sliding windows but I am
>>>>>> stuck because I need to set values to my object based on some if 
>>>>>> statements
>>>>>> in my process function  and send the object to the next step but I don't
>>>>>> want results every time a window is creating
>>>>>>
>>>>>> if anyone has a good example on this that can help me
>>>>>>
>>>>>


Re: processWindowFunction

2018-08-19 Thread vino yang
Hi antonio,

Regarding your scenario, I think maybe you can consider using the
ProcessFunction (or keyed ProcessFunction) function directly on the Stream.
[1]
It can handle each of your elements with a Timer, and you can combine
Flink's state API[2] to store your data.

[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#process-function-low-level-operations
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#working-with-state

Thanks, vino.

antonio saldivar  于2018年8月19日周日 上午10:18写道:

> hi Vino
>
> it is possible to use global window, then set the trigger onElement
> comparing the element that has arrived with for example 10 mins, 20 mins
> and 60 mins of data?
>
> I have rules evaluating sum of amount for 10,20 or 60 mins for the same
> keyed element if the same id sum like $200 total within those thresholds
> and count more or equals to 3 I need to be able to set some values to the
> object if the object does not reach those thresholds i do not set the
> values and keep sending the output with or without those value.
>
> just processing the object on the fly and send output
>
>
>
>
>
>
>
> El vie., 17 ago. 2018 a las 22:14, vino yang ()
> escribió:
>
>> Hi antonio,
>>
>> Yes, ProcessWindowFunction is a very low level window function.
>> It allows you to access the data in the window and allows you to
>> customize the output of the window.
>> So if you use it, while giving you flexibility, you need to think about
>> other things, which may require you to write more processing logic.
>>
>> Generally speaking, sliding windows usually have some data that is
>> repeated, but a common mode is to apply a reduce function on it to get your
>> calculation results.
>> If you only send data, there will definitely be some duplication.
>>
>> Thanks, vino.
>>
>> antonio saldivar  于2018年8月17日周五 下午12:01写道:
>>
>>> Hi Vino
>>> thank you for the information, actually I am using a trigger alert and
>>> processWindowFunction to send my results, but when my window slides or ends
>>> it sends again the objects and I an getting duplicated data
>>>
>>> El jue., 16 ago. 2018 a las 22:05, vino yang ()
>>> escribió:
>>>
>>>> Hi Antonio,
>>>>
>>>> What results do not you want to get when creating each window?
>>>> Examples of the use of ProcessWindowFunction are included in many test
>>>> files in Flink's project, such as SideOutputITCase.scala or
>>>> WindowTranslationTest.scala.
>>>>
>>>> For more information on ProcessWindowFunction, you can refer to the
>>>> official website.[1]
>>>>
>>>> [1]:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#processwindowfunction
>>>>
>>>> Thanks, vino.
>>>>
>>>> antonio saldivar  于2018年8月17日周五 上午6:24写道:
>>>>
>>>>> Hello
>>>>>
>>>>> I am implementing a data stream where I use sliding windows but I am
>>>>> stuck because I need to set values to my object based on some if 
>>>>> statements
>>>>> in my process function  and send the object to the next step but I don't
>>>>> want results every time a window is creating
>>>>>
>>>>> if anyone has a good example on this that can help me
>>>>>
>>>>


Re: processWindowFunction

2018-08-18 Thread antonio saldivar
hi Vino

it is possible to use global window, then set the trigger onElement
comparing the element that has arrived with for example 10 mins, 20 mins
and 60 mins of data?

I have rules evaluating sum of amount for 10,20 or 60 mins for the same
keyed element if the same id sum like $200 total within those thresholds
and count more or equals to 3 I need to be able to set some values to the
object if the object does not reach those thresholds i do not set the
values and keep sending the output with or without those value.

just processing the object on the fly and send output







El vie., 17 ago. 2018 a las 22:14, vino yang ()
escribió:

> Hi antonio,
>
> Yes, ProcessWindowFunction is a very low level window function.
> It allows you to access the data in the window and allows you to customize
> the output of the window.
> So if you use it, while giving you flexibility, you need to think about
> other things, which may require you to write more processing logic.
>
> Generally speaking, sliding windows usually have some data that is
> repeated, but a common mode is to apply a reduce function on it to get your
> calculation results.
> If you only send data, there will definitely be some duplication.
>
> Thanks, vino.
>
> antonio saldivar  于2018年8月17日周五 下午12:01写道:
>
>> Hi Vino
>> thank you for the information, actually I am using a trigger alert and
>> processWindowFunction to send my results, but when my window slides or ends
>> it sends again the objects and I an getting duplicated data
>>
>> El jue., 16 ago. 2018 a las 22:05, vino yang ()
>> escribió:
>>
>>> Hi Antonio,
>>>
>>> What results do not you want to get when creating each window?
>>> Examples of the use of ProcessWindowFunction are included in many test
>>> files in Flink's project, such as SideOutputITCase.scala or
>>> WindowTranslationTest.scala.
>>>
>>> For more information on ProcessWindowFunction, you can refer to the
>>> official website.[1]
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#processwindowfunction
>>>
>>> Thanks, vino.
>>>
>>> antonio saldivar  于2018年8月17日周五 上午6:24写道:
>>>
>>>> Hello
>>>>
>>>> I am implementing a data stream where I use sliding windows but I am
>>>> stuck because I need to set values to my object based on some if statements
>>>> in my process function  and send the object to the next step but I don't
>>>> want results every time a window is creating
>>>>
>>>> if anyone has a good example on this that can help me
>>>>
>>>


Re: processWindowFunction

2018-08-17 Thread vino yang
Hi antonio,

Yes, ProcessWindowFunction is a very low level window function.
It allows you to access the data in the window and allows you to customize
the output of the window.
So if you use it, while giving you flexibility, you need to think about
other things, which may require you to write more processing logic.

Generally speaking, sliding windows usually have some data that is
repeated, but a common mode is to apply a reduce function on it to get your
calculation results.
If you only send data, there will definitely be some duplication.

Thanks, vino.

antonio saldivar  于2018年8月17日周五 下午12:01写道:

> Hi Vino
> thank you for the information, actually I am using a trigger alert and
> processWindowFunction to send my results, but when my window slides or ends
> it sends again the objects and I an getting duplicated data
>
> El jue., 16 ago. 2018 a las 22:05, vino yang ()
> escribió:
>
>> Hi Antonio,
>>
>> What results do not you want to get when creating each window?
>> Examples of the use of ProcessWindowFunction are included in many test
>> files in Flink's project, such as SideOutputITCase.scala or
>> WindowTranslationTest.scala.
>>
>> For more information on ProcessWindowFunction, you can refer to the
>> official website.[1]
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#processwindowfunction
>>
>> Thanks, vino.
>>
>> antonio saldivar  于2018年8月17日周五 上午6:24写道:
>>
>>> Hello
>>>
>>> I am implementing a data stream where I use sliding windows but I am
>>> stuck because I need to set values to my object based on some if statements
>>> in my process function  and send the object to the next step but I don't
>>> want results every time a window is creating
>>>
>>> if anyone has a good example on this that can help me
>>>
>>


Re: processWindowFunction

2018-08-16 Thread antonio saldivar
Hi Vino
thank you for the information, actually I am using a trigger alert and
processWindowFunction to send my results, but when my window slides or ends
it sends again the objects and I an getting duplicated data

El jue., 16 ago. 2018 a las 22:05, vino yang ()
escribió:

> Hi Antonio,
>
> What results do not you want to get when creating each window?
> Examples of the use of ProcessWindowFunction are included in many test
> files in Flink's project, such as SideOutputITCase.scala or
> WindowTranslationTest.scala.
>
> For more information on ProcessWindowFunction, you can refer to the
> official website.[1]
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#processwindowfunction
>
> Thanks, vino.
>
> antonio saldivar  于2018年8月17日周五 上午6:24写道:
>
>> Hello
>>
>> I am implementing a data stream where I use sliding windows but I am
>> stuck because I need to set values to my object based on some if statements
>> in my process function  and send the object to the next step but I don't
>> want results every time a window is creating
>>
>> if anyone has a good example on this that can help me
>>
>


Re: processWindowFunction

2018-08-16 Thread vino yang
Hi Antonio,

What results do not you want to get when creating each window?
Examples of the use of ProcessWindowFunction are included in many test
files in Flink's project, such as SideOutputITCase.scala or
WindowTranslationTest.scala.

For more information on ProcessWindowFunction, you can refer to the
official website.[1]

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#processwindowfunction

Thanks, vino.

antonio saldivar  于2018年8月17日周五 上午6:24写道:

> Hello
>
> I am implementing a data stream where I use sliding windows but I am stuck
> because I need to set values to my object based on some if statements in my
> process function  and send the object to the next step but I don't want
> results every time a window is creating
>
> if anyone has a good example on this that can help me
>


processWindowFunction

2018-08-16 Thread antonio saldivar
Hello

I am implementing a data stream where I use sliding windows but I am stuck
because I need to set values to my object based on some if statements in my
process function  and send the object to the next step but I don't want
results every time a window is creating

if anyone has a good example on this that can help me


ProcessWindowFunction

2018-08-08 Thread 苗元君
I read the doc about ProcessWindowFunction
But I the code on the flink demo is incorrect

public class MyProcessWindowFunction extends
ProcessWindowFunction, String, String, TimeWindow>
{

Tuple cannot have to parameter.
I try to find a demo which ProcessWindowFunction used in window word count
demo,
I can not even find a complete correct demo with ProcessWindowFunction.

Can any one show me how to use ProcessWindowFunction in wordcount window
function with .process(ProcessWindowFunction) ?

-- 

*Yuanjun Miao*


Re: Access to time in aggregation, or aggregation in ProcessWindowFunction?

2017-06-20 Thread Nico Kruber
Hi William,
I'm not quite sure what you are trying to achieve...

What constitutes a "new event"? is this based on some key? If so, you may 
group on that key, create a window and use a custom trigger [1]  instead where 
you can react in onElement() and setup a event time timer for the first one and 
then react in onEventTime for your timeout.
A ProcessFunction [2] (without a window) looks like a better solution though 
depending on the details.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/
windows.html#triggers
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
process_function.html

On Tuesday, 20 June 2017 12:52:38 CEST William Saar wrote:
> Hi,
> I am looking to implement a window that sends out updates for each new
> event it receives and also when an expiration timer fires and purges
> the window (the expiration time can be determined from a timestamp in
> the first event).
> 
> I can't figure out a way to do this that does not require preserving
> all events in the window. It seems I would either need to be able to
> check the current watermark when an aggregation or its window function
> is evaluated to be able to fire the final update when the timer fires,
> or I would need the WindowProcessFunction (where I do have access to
> the time) to not preserve all elements in the window.
> 
> The only way I've come up with to implement this is to use a
> WindowProcessFunction that keeps state to only send out updates for
> new elements in the elements iterable. The WindowProcessFunction then
> also sends out an update when the first element timestamp meets the
> expiration condition, or if the elements iterable parameter does not
> contain any new elements (deducing that the processing must have been
> triggered by a timer invocation and not a new element). Is there a
> better way to do this?
> 
> Thanks,
> William



signature.asc
Description: This is a digitally signed message part.


Access to time in aggregation, or aggregation in ProcessWindowFunction?

2017-06-20 Thread William Saar
Hi,
I am looking to implement a window that sends out updates for each new
event it receives and also when an expiration timer fires and purges
the window (the expiration time can be determined from a timestamp in
the first event). 

I can't figure out a way to do this that does not require preserving
all events in the window. It seems I would either need to be able to
check the current watermark when an aggregation or its window function
is evaluated to be able to fire the final update when the timer fires,
or I would need the WindowProcessFunction (where I do have access to
the time) to not preserve all elements in the window.

The only way I've come up with to implement this is to use a
WindowProcessFunction that keeps state to only send out updates for
new elements in the elements iterable. The WindowProcessFunction then
also sends out an update when the first element timestamp meets the
expiration condition, or if the elements iterable parameter does not
contain any new elements (deducing that the processing must have been
triggered by a timer invocation and not a new element). Is there a
better way to do this?

Thanks,
William