Re: Need help using AggregateFunction instead of FoldFunction

2019-12-11 Thread Arvid Heise
Hi Devin, for event-time based windows, you need to give Flink two types of information: - timestamp of records, which I assume is in your case already embedded into the Pulsar records - and a watermark assigner. The watermarks help Flink to determine when windows can be closed in respect to

Re: Need help using AggregateFunction instead of FoldFunction

2019-12-10 Thread Devin Bost
I did confirm that I got no resulting output after 20 seconds and after sending additional data after waiting over a minute between batches of data. My code looks like this: PulsarSourceBuilder builder = PulsarSourceBuilder .builder(new SimpleStringSchema()) .serviceUrl(SERVICE_URL)

Re: Need help using AggregateFunction instead of FoldFunction

2019-12-10 Thread Arvid Heise
getResult will only be called when the window is triggered. For a fixed-time window, it triggers at the end of the window. However, for EventTimeSessionWindows you need to have gaps in the data. Can you verify that there is actually a 20sec pause inbetween data points for your keys? Additionally,

Re: Need help using AggregateFunction instead of FoldFunction

2019-12-08 Thread vino yang
Hi dev, The time of the window may have different semantics. In the session window, it's only a time gap, the size of the window is driven via activity events. In the tumbling or sliding window, it means the size of the window. For more details, please see the official documentation.[1] Best,

Re: Need help using AggregateFunction instead of FoldFunction

2019-12-06 Thread devinbost
I think there might be a bug in `.window(EventTimeSessionWindows.withGap(Time.seconds(5)))` (unless I'm just not using it correctly) because I'm able to get output when I use the simpler window `.timeWindow(Time.seconds(5))` However, I don't get any output when I used the session-based window.

Re: Need help using AggregateFunction instead of FoldFunction

2019-12-05 Thread devinbost
They released Pulsar 2.4.2, and I was able to pull its dependencies and successfully submit the Flink job. It's able to receive messages from the Pulsar topic successfully. However, I still don't think I'm using the AggregateFunction correctly. I added logging statements everywhere in my code,

Re: Need help using AggregateFunction instead of FoldFunction

2019-12-05 Thread Chris Miller
To: user@flink.apache.org Sent: 05/12/2019 04:35:05 Subject: Re: Need help using AggregateFunction instead of FoldFunction It turns out that the exception that I was getting is actually related to Pulsar since I'm using the Pulsar Flink connector. I found the exact issue reported here: https://github.com/apa

Re: Need help using AggregateFunction instead of FoldFunction

2019-12-04 Thread devinbost
It turns out that the exception that I was getting is actually related to Pulsar since I'm using the Pulsar Flink connector. I found the exact issue reported here: https://github.com/apache/pulsar/issues/4721 devinbost wrote > I was able to make more progress (based on the documentation you >

Re: Need help using AggregateFunction instead of FoldFunction

2019-12-04 Thread devinbost
Thanks for the help. I was able to make more progress (based on the documentation you provided), but now I'm getting this exception: org.apache.pulsar.client.impl.DefaultBatcherBuilder@3b5fad2d is not serializable. The object probably contains or references non serializable fields.

Re: Need help using AggregateFunction instead of FoldFunction

2019-12-04 Thread vino yang
Hi devinbost, Sharing two example links with you : - the example code of official documentation[1]; - a StackOverflow answer of a similar question[2]; [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#aggregatefunction [2]:

Need help using AggregateFunction instead of FoldFunction

2019-12-04 Thread devinbost
Hi, In my use case, I am attempting to create a keyedStream (on a string) and then window that stream (which represents keyed JSON objects) with EventTimeSessionWindows (so that I have a separate window for each set of JSON messages, according to the key), and then concatenate the JSON objects by