Hi Anton,
in many scenarios, it might be better just use a ProcessFunction because
you might reach the limit of the built-in window functions very quickly.
ProcessFunction gives you full flexibility and you can put into state
what you like and set/fire timers when you think the time is appropriate.
In general, timeWindowAll is not a good idea because there is no way of
parallelizing your stream and the window will be executed on one node.
Introducing artificial but stable keys is one way to distribute the
load. But you could also use rebalance() for this purpuse.
However, I don't understand the `timestamp % parallelism` logic.
Regards,
Timo
On 15.01.21 11:53, Anton W. Haubner wrote:
Hello!
I hope this is the correct mailing list for newb questions regarding
flink stream processing.
Essentially, I have a question about how to apply a transformation to
each individual element of a sliding window in regular intervals.
I think a little background to the problem I'm trying to solve could be
helpful before asking the concrete question:
I have a service *A* which continuously produces events, and another
service *B* which accepts collections of processed events.
The collections accepted by the latter service are produced from the
events received within the last minute. So what i am currently doing is
using timeWindowAll to buffer a sliding window of 1 minute size and a
aggregate function which produces arrays of events from the windows.
These arrays are then sent to the consumer service:
<event source A>
...
.timeWindowAll(Time.seconds(60), Time.seconds(1))
.aggregate(<collect events of window into an array>)
.addSink(<send to service B>)
/This works,/ but i need to add another functionality: Before being sent
off to the consumer service *B*, all events have to be annotated with a
value which needs to be computed based on the time that passed since the
event was produced. What I am currently doing is, is applying a map
function to the stream of produced arrays. This seems awfully
inefficient to me, since each call of the map function has to work on
the whole content of a window (now contained in the array):
<event source A>
...
.timeWindowAll(Time.seconds(60), Time.seconds(1))
.aggregate(<collect events of window into an array>)
.map(<iterate over array, producing a new one with modified events>)
.addSink(<send to service B>)
Instead, i wonder if it was possible to apply a map function to the
elements of a window. As I understand it, this is not currently possible
(http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Filtering-and-mapping-data-after-window-opertator-td21449.html#a21458).
Another idea would be, to add keyed windows before the timeWindowAll
using the event timestamp value modulo parallelism as keys and perform
the event transformation while aggregating each window into an array.
Then the computation could be performed in parallel on these smaller
windows and afterwards I join the produced arrays:
...
.keyBy(event -> event.timestamp % parallelism)
.timeWindow(Time.seconds(60), Time.seconds(1))
.reduce(<apply mapping and combine into array>)
.timeWindowAll(Time.seconds(60), Time.seconds(1))
.aggregate(<join arrays of last minute>)
.addSink(<consumer service B>)
What do you think of this idea? Is there a better way to handle this?
Thank you for your help.
Best regards,
Anton