Hi, I also deployed the application to Dataflow then got the same result. The actual session interval was not the same as the given session interval (60 minutes).
--- new 2 record(s) in 1:50:00 session (start 2018-10-19 11:10:00 end 2018-10-19 13:00:00) new 2 record(s) in 1:01:00 session (start 2018-10-19 10:02:00 end 2018-10-19 11:03:00) new 5 record(s) in 3:00:00 session (start 2018-10-19 10:00:00 end 2018-10-19 13:00:00) --- https://stackoverflow.com/questions/61402895/apache-beam-python-sdk-inaccurate-session-window-interval On Fri, Apr 24, 2020 at 3:09 PM Yohei Onishi <[email protected]> wrote: > Hi, > > I am trying to process data with 60 minutes session interval using Apache > Beam Python SDK. But the actual session interval was inaccurate such as > 3:00:00 or 1:01:00 or 1:50:00 when I run my application locally using > DirectRunner. > > Would you help me find a solution to fix this issue and process data with > 60 minutes session? > > I built my pipeline as bellow. > > ----- > with Pipeline(options=pipeline_options) as pipeline: > ( > pipeline > | "Read" >> ReadFromText(known_args.input, skip_header_lines=1) > | "Convert" >> ParDo(Convert()) > | "Add Timestamp" >> Map(lambda x: window.TimestampedValue(x, > get_timestamp_from_element(x).timestamp())) > | "Use User ID As Key" >> Map(lambda x: (x["user_id"], x)) > | "Apply Session Window" >> > WindowInto(window.Sessions(known_args.session_interval)) > | "Group" >> GroupByKey() > | "Write To CSV" >> ParDo(WriteToCSV(known_args.output)) > ) > result = pipeline.run() > result.wait_until_finish() > ----- > > session_interval (60 minutes) is provided as bellow. > > ----- > parser.add_argument( > "--session_interval", > help="Interval of each session", > default=60*60) # 60 mins > ----- > > WriteToCSV function process data per session. I logged the session > duration but it was not accurate. > > ----- > class WriteToCSV(DoFn): > def __init__(self, output_path): > self.output_path = output_path > > def process(self, element, window=DoFn.WindowParam): > window_start = window.start.to_utc_datetime() > window_end = window.end.to_utc_datetime() > duration = window_end - window_start > logging.info(">>> new %s record(s) in %s session (start %s end > %s)", len(click_records), duration, window_start, window_end) > .... > ----- > > Then I got this log messages when I run this application locally with > DirectRunner. > > ----- > new 5 records in 3:00:00 session (start 2018-10-19 02:00:00 end 2018-10-19 > 05:00:00) > new 2 records in 1:01:00 session (start 2018-10-19 02:02:00 end 2018-10-19 > 03:03:00) > new 2 records in 1:50:00 session (start 2018-10-19 03:10:00 end 2018-10-19 > 05:00:00 > ----- > > Thanks. > > >
