I played with your example. Indeed, create_tracker in your ProcessFilesFn is never called, which is quite strange. I could not find any example that shows the chained SDFs, which makes me wonder whether the chained SDFs work.
@Chamikara Jayalath <[email protected]> Any thoughts? On Fri, May 3, 2024 at 2:45 AM Jaehyeon Kim <[email protected]> wrote: > Hello, > > I am building a pipeline using two SDFs that are chained. The first > function (DirectoryWatchFn) checks a folder continuously and grabs if a new > file is added. The second one (ProcessFilesFn) processes a file > while splitting each line - the processing simply prints the file name and > line number. > > The process function of the first SDF gets stuck if I yield a new file > object. Specifically, although the second SDF is called as I can check the > initial restriction is created, the tracker is not created at all! > > On the other hand, if I return the file object list, the second SDF works > fine but the issue is the first SDF stops as soon as it returns the first > list of files. > > The source of the pipeline can be found in > - First SDF: > https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/directory_watch.py > - Second SDF: > https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/file_read.py > - Pipeline: > https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/streaming_file_read.py > > Can you please inform me how to handle this issue? > > Cheers, > Jaehyeon > > class DirectoryWatchFn(beam.DoFn): > POLL_TIMEOUT = 10 > > @beam.DoFn.unbounded_per_element() > def process( > self, > element: str, > tracker: RestrictionTrackerView = beam.DoFn.RestrictionParam( > DirectoryWatchRestrictionProvider() > ), > watermark_estimater: WatermarkEstimatorProvider = beam.DoFn. > WatermarkEstimatorParam( > DirectoryWatchWatermarkEstimatorProvider() > ), > ) -> typing.Iterable[MyFile]: > new_files = self._get_new_files_if_any(element, tracker) > if self._process_new_files(tracker, watermark_estimater, new_files > ): > # return [new_file[0] for new_file in new_files] #<-- it > doesn't get stuck but the SDF finishes > for new_file in new_files: #<--- it gets stuck if yielding > file objects > yield new_file[0] > else: > return > tracker.defer_remainder(Duration.of(self.POLL_TIMEOUT)) > > def _get_new_files_if_any( > self, element: str, tracker: DirectoryWatchRestrictionTracker > ) -> typing.List[typing.Tuple[MyFile, Timestamp]]: > new_files = [] > for file in os.listdir(element): > if ( > os.path.isfile(os.path.join(element, file)) > and file not in tracker.current_restriction(). > already_processed > ): > num_lines = sum(1 for _ in open(os.path.join(element, file > ))) > new_file = MyFile(file, 0, num_lines) > print(new_file) > new_files.append( > ( > new_file, > Timestamp.of(os.path.getmtime(os.path.join(element, > file))), > ) > ) > return new_files > > def _process_new_files( > self, > tracker: DirectoryWatchRestrictionTracker, > watermark_estimater: ManualWatermarkEstimator, > new_files: typing.List[typing.Tuple[MyFile, Timestamp]], > ): > max_instance = watermark_estimater.current_watermark() > for new_file in new_files: > if tracker.try_claim(new_file[0].name) is False: > watermark_estimater.set_watermark(max_instance) > return False > if max_instance < new_file[1]: > max_instance = new_file[1] > watermark_estimater.set_watermark(max_instance) > return max_instance < MAX_TIMESTAMP >
