Thanks for the reply Wisniowski Piotr.

As you mentioned, the data can be big and I am okay with writing multiple
files every 5 minutes..
If I assign a key that uniquely represents a worker, that makes sure all
the data that is processed together stays together. But there can still be
a shuffle and the data can collectively be moved across nodes when we are
assigning the keys right? Is there any way I can avoid the extra shuffle?

Do you think it is a good idea to instead have a separate scheduler thread
within the DoFn which will periodically flush the data every 5 minutes from
each worker?

Thanks,
Vamsi

On Tue, 27 Aug 2024 at 17:49, Wiśniowski Piotr <
contact.wisniowskipi...@gmail.com> wrote:

> Hi,
>
> If you require only a single file per 5 min then what you are missing is
> that you need to window data into fixed windows, so that state-full DoFn
> could store the elements per key per window.
>
> Something like (did not test this code just pseudo code):
> class StatefulWriteToFileDoFn(beam.DoFn):
> BAG_STATE = BagStateSpec('bag_state', VarIntCoder())
> TIMER = TimerSpec('timer', TimeDomain.WATERMARK)
> def process(self, element, timestamp=beam.DoFn.TimestampParam,
> window=beam.DoFn.WindowParam,
> bag_state=beam.DoFn.StateParam(BAG_STATE),
> timer=beam.DoFn.TimerParam(TIMER)):
> bag_state.add(element)
> timer.set(window.end)
> def on_timer(self, window=beam.DoFn.WindowParam, bag_state=
> beam.DoFn.StateParam(BAG_STATE)):
> # Here, you can generate a filename based on the window's end time, for
> example
> filename = f'output_{window.end.to_utc_datetime().strftime("%Y%m%d%H%M%S")
> }.txt'
> with open(filename, 'w') as f:
> for element in bag_state.read():
> f.write(f'{element}\n')
> def run():
> with beam.Pipeline(options=PipelineOptions()) as p:
> (p
> | 'ReadStream' >> beam.io.ReadFromPubSub(topic='projects/.../topics/...')
> | 'WindowIntoFixed' >> beam.WindowInto(FixedWindows(300)) # 5-minute
> windows
> | 'MapToSingleKey' >> beam.Map(lambda x: (1, x)) # Map to a single key
> | 'StatefulWriteToFile' >> beam.ParDo(StatefulWriteToFileDoFn())
> )
>
> So the state-full dofn will keep its state per key per window. So each
> window should crate its own output file. But note that this requires you to
> put all the 5 min data in same worker as only a single worker can be
> responsible for creating the file and hence the shuffling is required.
>
> There is a workaround If the traffic might be too big, but it would mean
> to generate more files per 5 min window (one file per worker).
>
> The trick is to assign key that uniquely represents the worker, not the
> data. So every worker that maps the key should have his unique value put in
> this key.
> See this example:
> class _Key(DoFn):
> def setup(self):
> self.shard_prefix = str(uuid4())
> def process(
> self,
> x: input_type,
> ) -> Iterable[tuple[str, input_type]]:
> yield (
> self.shard_prefix + str(threading.get_ident()), # each worker may create
> his batch
> x,
> )
> And then you can use it like
> | "KeyPerWorker" >> ParDo(_Key())
> instead of using constant key with the first approach. Also remember to
> make sure file names are unique if using this approach.
> Best
> Wisniowski Piotr
>
> On 25.08.2024 20:30, vamsikrishna korada wrote:
>
> Hi everyone,
>
> I'm new to Apache Beam and have a question regarding its usage.
>
> I have a scenario where I need to read a stream of elements from a
> PCollection and write them to a new file every 5 minutes.
>
> Initially, I considered using Timers and state stores, but I discovered
> that Timers are only applicable to KV pairs. If I convert my PCollection
> into a key-value pair with a dummy key and then use timers, I encountered
> several issues:
>
>    1. It introduces an additional shuffle.
>    2. With all elements sharing the same key, they would be processed by
>    a single task in the Flink on Beam application. I prefer not to manually
>    define the number of keys based on load because I plan to run multiple
>    pipelines, each with varying loads.
>
> One alternative I considered is using a custom executor thread within my
> Writer DoFn to flush the records every 5 minutes. However, this approach
> would require me to use a lock to make sure only one of the process element
> and the flush blocks are running at a time.
>
> Is there a more effective way to accomplish this?
>
>
>
> Thanks,
>
> Vamsi
>
>
>
>

Reply via email to