Thanks for the reply Wisniowski Piotr. As you mentioned, the data can be big and I am okay with writing multiple files every 5 minutes.. If I assign a key that uniquely represents a worker, that makes sure all the data that is processed together stays together. But there can still be a shuffle and the data can collectively be moved across nodes when we are assigning the keys right? Is there any way I can avoid the extra shuffle?
Do you think it is a good idea to instead have a separate scheduler thread within the DoFn which will periodically flush the data every 5 minutes from each worker? Thanks, Vamsi On Tue, 27 Aug 2024 at 17:49, Wiśniowski Piotr < contact.wisniowskipi...@gmail.com> wrote: > Hi, > > If you require only a single file per 5 min then what you are missing is > that you need to window data into fixed windows, so that state-full DoFn > could store the elements per key per window. > > Something like (did not test this code just pseudo code): > class StatefulWriteToFileDoFn(beam.DoFn): > BAG_STATE = BagStateSpec('bag_state', VarIntCoder()) > TIMER = TimerSpec('timer', TimeDomain.WATERMARK) > def process(self, element, timestamp=beam.DoFn.TimestampParam, > window=beam.DoFn.WindowParam, > bag_state=beam.DoFn.StateParam(BAG_STATE), > timer=beam.DoFn.TimerParam(TIMER)): > bag_state.add(element) > timer.set(window.end) > def on_timer(self, window=beam.DoFn.WindowParam, bag_state= > beam.DoFn.StateParam(BAG_STATE)): > # Here, you can generate a filename based on the window's end time, for > example > filename = f'output_{window.end.to_utc_datetime().strftime("%Y%m%d%H%M%S") > }.txt' > with open(filename, 'w') as f: > for element in bag_state.read(): > f.write(f'{element}\n') > def run(): > with beam.Pipeline(options=PipelineOptions()) as p: > (p > | 'ReadStream' >> beam.io.ReadFromPubSub(topic='projects/.../topics/...') > | 'WindowIntoFixed' >> beam.WindowInto(FixedWindows(300)) # 5-minute > windows > | 'MapToSingleKey' >> beam.Map(lambda x: (1, x)) # Map to a single key > | 'StatefulWriteToFile' >> beam.ParDo(StatefulWriteToFileDoFn()) > ) > > So the state-full dofn will keep its state per key per window. So each > window should crate its own output file. But note that this requires you to > put all the 5 min data in same worker as only a single worker can be > responsible for creating the file and hence the shuffling is required. > > There is a workaround If the traffic might be too big, but it would mean > to generate more files per 5 min window (one file per worker). > > The trick is to assign key that uniquely represents the worker, not the > data. So every worker that maps the key should have his unique value put in > this key. > See this example: > class _Key(DoFn): > def setup(self): > self.shard_prefix = str(uuid4()) > def process( > self, > x: input_type, > ) -> Iterable[tuple[str, input_type]]: > yield ( > self.shard_prefix + str(threading.get_ident()), # each worker may create > his batch > x, > ) > And then you can use it like > | "KeyPerWorker" >> ParDo(_Key()) > instead of using constant key with the first approach. Also remember to > make sure file names are unique if using this approach. > Best > Wisniowski Piotr > > On 25.08.2024 20:30, vamsikrishna korada wrote: > > Hi everyone, > > I'm new to Apache Beam and have a question regarding its usage. > > I have a scenario where I need to read a stream of elements from a > PCollection and write them to a new file every 5 minutes. > > Initially, I considered using Timers and state stores, but I discovered > that Timers are only applicable to KV pairs. If I convert my PCollection > into a key-value pair with a dummy key and then use timers, I encountered > several issues: > > 1. It introduces an additional shuffle. > 2. With all elements sharing the same key, they would be processed by > a single task in the Flink on Beam application. I prefer not to manually > define the number of keys based on load because I plan to run multiple > pipelines, each with varying loads. > > One alternative I considered is using a custom executor thread within my > Writer DoFn to flush the records every 5 minutes. However, this approach > would require me to use a lock to make sure only one of the process element > and the flush blocks are running at a time. > > Is there a more effective way to accomplish this? > > > > Thanks, > > Vamsi > > > >