Hi, I'm trying to implement defining sessions from an unbounded source of
user events by assigning session_id to each event. In Beam context, if a
user's event belongs to a specific Sessions window, all events in that
window should be assigned same session_id.
tl;dr: Is it possible to access window metadata (start time) in a combine
function?
To attack above problem in beam, I'm testing following code -
1. Transform PCollection of UserEvents to KV of userId and UserEvent.
2. Define Window via Sessions
3. Trigger to output events early (not to wait until window is finished)
4. CombineFn to accumulate all events and generate sessionId
5. ParDo to assign sessionId to each event
PCollection<UserEvent> sessionizedEvents = UserEvents.apply("ToKV",
WithKeys.of(UserEvent::getUserId))
.setCoder(KvCoder.of(StringUtf8Coder.of(),
UserEvent.UserEventCoder.of()))
.apply("UserSessions", Window.<KV<String, UserEvent>>into(
Sessions.<UserEvent>withGapDuration(Duration.standardMinutes(30))
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(60)))
)
.discardingFiredPanes()
.withAllowedLateness(Duration.standardDays(2))
)
.apply("SessionId", Combine.<String, UserEvent, UserSession>perKey(new
UserSessionCombineFn())) // UserSession contains list of all events
received by accumulator and sessionId
.setCoder(KvCoder.of(StringUtf8Coder.of(), UserSessionCoder.of()))
.apply("ExtractEventsFromSession", TransformLibrary.sessionToEvents())
// Flattens all UserEvents in UserSession and assigns sessionId to each
UserEvent
.setCoder(UserEvent.UserEventCoder.of()
);
Issues I'm facing:
1. CombineFn is called for set of events spanning one trigger, without
any info of the window those events belong to. So currently the sessionId
being generated is applicable for only events from one trigger instead of
window
2. I'm thinking to generate session id from userId and window/session
start time but couldn't find any document suggesting if it's possible, if
yes, how
3. I also tried writing a ParDo before and after CombineFn to access
window metadata
1. Before CombineFn: Window is not merged before a Combine/GroupBy
operation so the window start time for each event is separate
2. After CombineFn: Get exception saying: "*expected window type from
parameter
(org.apache.beam.sdk.transforms.windowing.IntervalWindow) is not
a supertype of actual window type assigned by windowing (W)*"
Any pointers/articles/code snippets on how to address above problem would
be helpful.
--
Thanks & Regards,
*Jainik*