[Question] Best practices about passing binary files between transforms

2021-07-07 Thread Ignacio Taranto
Hello everyone,

At my company we are considering using Apache Beam as part of our
Analytics system using the Python SDK.

Our dataset consists of an unbounded collection of TAR (gzipped)
archives which contain several JSON and binary files.
These TAR files need to be split into sub-categories, so, essentially
outputting a new collection composed of smaller parts.
Our transforms will operate over this second collection.

The  size of the compressed TAR archive files is around 10 MiB and the
largest binary files we have are around 16 MiB.
We only have a couple of these, the rest of the binary files are
smaller than that.

Also, in some cases, we may want some transformations to generate new
binary files from this collection.

The first problem I encountered is that there's no native way to
extract TAR archives, so my first approach was to unpack the TAR in
place (in a temporary directory) and then return the JSON files as
objects and the binary files as bytes.
But this crashes the Flink runner due to the large memory consumption.

Is there a way to pass large binary files along each instance of the pipeline?

I'm aware of fileio.py, I tried using WriteToFiles to write the
unpacked binary files with no success.
Apparently WriteToFiles groups all the files data into the same file.

I'm also aware that I can implement my own IO transforms using
FileBasedSource and FileBasedSink but it seems these classes are
"record oriented" which is not very useful for us.

Is Apache Beam the right framework for us?
Can we implement our system using Beam?

Thanks,
Ignacio.

-- 


This e-mail and any attachments may contain information that is 
privileged, confidential,  and/or exempt from disclosure under applicable 
law.  If you are not the intended recipient, you are hereby notified that 
any disclosure, copying, distribution or use of any information contained 
herein is strictly prohibited. If you have received this transmission in 
error, please immediately notify the sender and destroy the original 
transmission and any attachments, whether in electronic or hard copy 
format, without reading or saving.














Re: Help: Apache Beam Session Window with limit on number of events and time elapsed from window start

2021-07-07 Thread Kenneth Knowles
Hi Chandan,

I am moving this thread to user@beam.apache.org. I think that is the best
place to discuss.

Kenn

On Wed, Jul 7, 2021 at 9:32 AM Chandan Bhattad 
wrote:

> Hi Team,
>
> Hope you are doing well.
>
> I have a use case around session windowing with some customizations.
>
> We need to have create user sessions based on *any *of the 3 conditions
> below
>
> 1. Session Window of 30 minutes (meaning, 30 minutes of inactivity i.e. no
> event for 30 minutes for a user)
> 2. Number of events in the session window reaches 20,000
> 3. 4 hours have elapsed since window start
>
> Below is what I have tried.
>
> beam.WindowInto(window.Sessions(session_timeout_seconds),
> trigger=trigger.Repeatedly(
> trigger.AfterAny(
> trigger.AfterCount(2),
> trigger.DefaultTrigger(),
> TriggerWhenWindowStartPassesXHours(hours=0.2)
> )
> ),
> timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW,
> accumulation_mode=trigger.AccumulationMode.DISCARDING
> )
>
>
> # Custom Trigger Implementation
> from apache_beam.transforms.trigger import DefaultTrigger
> from apache_beam.utils.timestamp import Timestamp
>
>
> class TriggerWhenWindowStartPassesXHours(DefaultTrigger):
>
> def __init__(self, hours=4):
> super(TriggerWhenWindowStartPassesXHours, self).__init__()
> self.hours = hours
>
> def __repr__(self):
> return 'TriggerWhenWindowStartPassesXHours()'
>
> def should_fire(self, time_domain, watermark, window, context):
> should_fire = (Timestamp.now() - window.start).micros >= 36 * 
> self.hours
> return should_fire
>
> @staticmethod
> def from_runner_api(proto, context):
> return TriggerWhenWindowStartPassesXHours()
>
> The above works well, but there is an issue. Whenever Trigger No. 3 above
> fires -- it does not create a new session window, but the same window is
> continued.
> What happens due to this is, the No. 3 would keep on firing on every
> subsequent after 4 hours since window start, since should_fire condition is:
>
> should_fire = (Timestamp.now() - window.start).micros >= 36 * 
> self.hours
>
> and since window.start is never updated after the first time trigger is
> fired - it will fire for every subsequent event after the first trigger.
>
> I have also posted this on stackoverflow:
> https://stackoverflow.com/questions/68250618/apache-beam-session-window-with-limit-on-number-of-events
>
> I would be very grateful for any help as to how to achieve this.
> Thanks a lot in advance.
>
> Regards,
> Chandan
>