I added this issue here https://github.com/apache/beam/issues/24528#issuecomment-2095026324 But we do not plan to fix this for Python DirectRunner since Prism will become the default local runner when it is ready.
On Sun, May 5, 2024 at 2:41 PM Jaehyeon Kim <[email protected]> wrote: > Hi XQ > > Yes, it works with the FlinkRunner. Thank you so much! > > Cheers, > Jaehyeon > > [image: image.png] > > On Mon, 6 May 2024 at 02:49, XQ Hu via user <[email protected]> wrote: > >> Have you tried to use other runners? I think this might be caused by some >> gaps in Python DirectRunner to support the streaming cases or SDFs, >> >> On Sun, May 5, 2024 at 5:10 AM Jaehyeon Kim <[email protected]> wrote: >> >>> Hi XQ >>> >>> Thanks for checking it out. SDFs chaining seems to work as I created my >>> pipeline while converting a pipeline that is built in the Java SDK. The >>> source of the Java pipeline can be found in >>> https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter7/src/main/java/com/packtpub/beam/chapter7/StreamingFileRead.java >>> >>> So far, when I yield outputs, the second SDF gets stuck while it gets >>> executed if I return them (but the first SDF completes). If I change the >>> second SDF into a do function without adding the tracker, it is executed >>> fine. Not sure what happens in the first scenario. >>> >>> Cheers, >>> Jaehyeon >>> >>> On Sun, 5 May 2024 at 09:21, XQ Hu via user <[email protected]> >>> wrote: >>> >>>> I played with your example. Indeed, create_tracker in >>>> your ProcessFilesFn is never called, which is quite strange. >>>> I could not find any example that shows the chained SDFs, which makes >>>> me wonder whether the chained SDFs work. >>>> >>>> @Chamikara Jayalath <[email protected]> Any thoughts? >>>> >>>> On Fri, May 3, 2024 at 2:45 AM Jaehyeon Kim <[email protected]> wrote: >>>> >>>>> Hello, >>>>> >>>>> I am building a pipeline using two SDFs that are chained. The first >>>>> function (DirectoryWatchFn) checks a folder continuously and grabs if a >>>>> new >>>>> file is added. The second one (ProcessFilesFn) processes a file >>>>> while splitting each line - the processing simply prints the file name and >>>>> line number. >>>>> >>>>> The process function of the first SDF gets stuck if I yield a new file >>>>> object. Specifically, although the second SDF is called as I can check the >>>>> initial restriction is created, the tracker is not created at all! >>>>> >>>>> On the other hand, if I return the file object list, the second SDF >>>>> works fine but the issue is the first SDF stops as soon as it returns the >>>>> first list of files. >>>>> >>>>> The source of the pipeline can be found in >>>>> - First SDF: >>>>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/directory_watch.py >>>>> - Second SDF: >>>>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/file_read.py >>>>> - Pipeline: >>>>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/streaming_file_read.py >>>>> >>>>> Can you please inform me how to handle this issue? >>>>> >>>>> Cheers, >>>>> Jaehyeon >>>>> >>>>> class DirectoryWatchFn(beam.DoFn): >>>>> POLL_TIMEOUT = 10 >>>>> >>>>> @beam.DoFn.unbounded_per_element() >>>>> def process( >>>>> self, >>>>> element: str, >>>>> tracker: RestrictionTrackerView = beam.DoFn.RestrictionParam( >>>>> DirectoryWatchRestrictionProvider() >>>>> ), >>>>> watermark_estimater: WatermarkEstimatorProvider = beam.DoFn. >>>>> WatermarkEstimatorParam( >>>>> DirectoryWatchWatermarkEstimatorProvider() >>>>> ), >>>>> ) -> typing.Iterable[MyFile]: >>>>> new_files = self._get_new_files_if_any(element, tracker) >>>>> if self._process_new_files(tracker, watermark_estimater, >>>>> new_files): >>>>> # return [new_file[0] for new_file in new_files] #<-- it >>>>> doesn't get stuck but the SDF finishes >>>>> for new_file in new_files: #<--- it gets stuck if >>>>> yielding file objects >>>>> yield new_file[0] >>>>> else: >>>>> return >>>>> tracker.defer_remainder(Duration.of(self.POLL_TIMEOUT)) >>>>> >>>>> def _get_new_files_if_any( >>>>> self, element: str, tracker: DirectoryWatchRestrictionTracker >>>>> ) -> typing.List[typing.Tuple[MyFile, Timestamp]]: >>>>> new_files = [] >>>>> for file in os.listdir(element): >>>>> if ( >>>>> os.path.isfile(os.path.join(element, file)) >>>>> and file not in tracker.current_restriction(). >>>>> already_processed >>>>> ): >>>>> num_lines = sum(1 for _ in open(os.path.join(element, >>>>> file))) >>>>> new_file = MyFile(file, 0, num_lines) >>>>> print(new_file) >>>>> new_files.append( >>>>> ( >>>>> new_file, >>>>> Timestamp.of(os.path.getmtime(os.path.join( >>>>> element, file))), >>>>> ) >>>>> ) >>>>> return new_files >>>>> >>>>> def _process_new_files( >>>>> self, >>>>> tracker: DirectoryWatchRestrictionTracker, >>>>> watermark_estimater: ManualWatermarkEstimator, >>>>> new_files: typing.List[typing.Tuple[MyFile, Timestamp]], >>>>> ): >>>>> max_instance = watermark_estimater.current_watermark() >>>>> for new_file in new_files: >>>>> if tracker.try_claim(new_file[0].name) is False: >>>>> watermark_estimater.set_watermark(max_instance) >>>>> return False >>>>> if max_instance < new_file[1]: >>>>> max_instance = new_file[1] >>>>> watermark_estimater.set_watermark(max_instance) >>>>> return max_instance < MAX_TIMESTAMP >>>>> >>>>
