Hi,

I also deployed the application to Dataflow then got the same result.
The actual session interval was not the same as the given session interval
(60 minutes).

---
new 2 record(s) in 1:50:00 session (start 2018-10-19 11:10:00 end
2018-10-19 13:00:00)
new 2 record(s) in 1:01:00 session (start 2018-10-19 10:02:00 end
2018-10-19 11:03:00)
new 5 record(s) in 3:00:00 session (start 2018-10-19 10:00:00 end
2018-10-19 13:00:00)
---

https://stackoverflow.com/questions/61402895/apache-beam-python-sdk-inaccurate-session-window-interval

On Fri, Apr 24, 2020 at 3:09 PM Yohei Onishi <[email protected]>
wrote:

> Hi,
>
> I am trying to process data with 60 minutes session interval using Apache
> Beam Python SDK. But the actual session interval was inaccurate such as
> 3:00:00 or 1:01:00 or 1:50:00 when I run my application locally using
> DirectRunner.
>
> Would you help me find a solution to fix this issue and process data with
> 60 minutes session?
>
> I built my pipeline as bellow.
>
> -----
> with Pipeline(options=pipeline_options) as pipeline:
>     (
>         pipeline
>         | "Read" >> ReadFromText(known_args.input, skip_header_lines=1)
>         | "Convert" >> ParDo(Convert())
>         | "Add Timestamp" >> Map(lambda x: window.TimestampedValue(x,
> get_timestamp_from_element(x).timestamp()))
>         | "Use User ID As Key" >> Map(lambda x: (x["user_id"], x))
>         | "Apply Session Window" >>
> WindowInto(window.Sessions(known_args.session_interval))
>         | "Group" >> GroupByKey()
>         | "Write To CSV" >> ParDo(WriteToCSV(known_args.output))
>     )
>     result = pipeline.run()
>     result.wait_until_finish()
> -----
>
> session_interval (60 minutes) is provided as bellow.
>
> -----
>     parser.add_argument(
>         "--session_interval",
>         help="Interval of each session",
>         default=60*60) # 60 mins
> -----
>
> WriteToCSV function process data per session. I logged the session
> duration but it was not accurate.
>
> -----
> class WriteToCSV(DoFn):
>     def __init__(self, output_path):
>         self.output_path = output_path
>
>     def process(self, element, window=DoFn.WindowParam):
>         window_start = window.start.to_utc_datetime()
>         window_end = window.end.to_utc_datetime()
>         duration = window_end - window_start
>         logging.info(">>> new %s record(s) in %s session (start %s end
> %s)", len(click_records), duration, window_start, window_end)
>         ....
> -----
>
> Then I got this log messages when I run this application locally with
> DirectRunner.
>
> -----
> new 5 records in 3:00:00 session (start 2018-10-19 02:00:00 end 2018-10-19
> 05:00:00)
> new 2 records in 1:01:00 session (start 2018-10-19 02:02:00 end 2018-10-19
> 03:03:00)
> new 2 records in 1:50:00 session (start 2018-10-19 03:10:00 end 2018-10-19
> 05:00:00
> -----
>
> Thanks.
>
>
>

Reply via email to