I added this issue here
https://github.com/apache/beam/issues/24528#issuecomment-2095026324
But we do not plan to fix this for Python DirectRunner since Prism will
become the default local runner when it is ready.

On Sun, May 5, 2024 at 2:41 PM Jaehyeon Kim <[email protected]> wrote:

> Hi XQ
>
> Yes, it works with the FlinkRunner. Thank you so much!
>
> Cheers,
> Jaehyeon
>
> [image: image.png]
>
> On Mon, 6 May 2024 at 02:49, XQ Hu via user <[email protected]> wrote:
>
>> Have you tried to use other runners? I think this might be caused by some
>> gaps in Python DirectRunner to support the streaming cases or SDFs,
>>
>> On Sun, May 5, 2024 at 5:10 AM Jaehyeon Kim <[email protected]> wrote:
>>
>>> Hi XQ
>>>
>>> Thanks for checking it out. SDFs chaining seems to work as I created my
>>> pipeline while converting a pipeline that is built in the Java SDK. The
>>> source of the Java pipeline can be found in
>>> https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter7/src/main/java/com/packtpub/beam/chapter7/StreamingFileRead.java
>>>
>>> So far, when I yield outputs, the second SDF gets stuck while it gets
>>> executed if I return them (but the first SDF completes). If I change the
>>> second SDF into a do function without adding the tracker, it is executed
>>> fine. Not sure what happens in the first scenario.
>>>
>>> Cheers,
>>> Jaehyeon
>>>
>>> On Sun, 5 May 2024 at 09:21, XQ Hu via user <[email protected]>
>>> wrote:
>>>
>>>> I played with your example. Indeed, create_tracker in
>>>> your ProcessFilesFn is never called, which is quite strange.
>>>> I could not find any example that shows the chained SDFs, which makes
>>>> me wonder whether the chained SDFs work.
>>>>
>>>> @Chamikara Jayalath <[email protected]> Any thoughts?
>>>>
>>>> On Fri, May 3, 2024 at 2:45 AM Jaehyeon Kim <[email protected]> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I am building a pipeline using two SDFs that are chained. The first
>>>>> function (DirectoryWatchFn) checks a folder continuously and grabs if a 
>>>>> new
>>>>> file is added. The second one (ProcessFilesFn) processes a file
>>>>> while splitting each line - the processing simply prints the file name and
>>>>> line number.
>>>>>
>>>>> The process function of the first SDF gets stuck if I yield a new file
>>>>> object. Specifically, although the second SDF is called as I can check the
>>>>> initial restriction is created, the tracker is not created at all!
>>>>>
>>>>> On the other hand, if I return the file object list, the second SDF
>>>>> works fine but the issue is the first SDF stops as soon as it returns the
>>>>> first list of files.
>>>>>
>>>>> The source of the pipeline can be found in
>>>>> - First SDF:
>>>>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/directory_watch.py
>>>>> - Second SDF:
>>>>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/file_read.py
>>>>> - Pipeline:
>>>>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/streaming_file_read.py
>>>>>
>>>>> Can you please inform me how to handle this issue?
>>>>>
>>>>> Cheers,
>>>>> Jaehyeon
>>>>>
>>>>> class DirectoryWatchFn(beam.DoFn):
>>>>>     POLL_TIMEOUT = 10
>>>>>
>>>>>     @beam.DoFn.unbounded_per_element()
>>>>>     def process(
>>>>>         self,
>>>>>         element: str,
>>>>>         tracker: RestrictionTrackerView = beam.DoFn.RestrictionParam(
>>>>>             DirectoryWatchRestrictionProvider()
>>>>>         ),
>>>>>         watermark_estimater: WatermarkEstimatorProvider = beam.DoFn.
>>>>> WatermarkEstimatorParam(
>>>>>             DirectoryWatchWatermarkEstimatorProvider()
>>>>>         ),
>>>>>     ) -> typing.Iterable[MyFile]:
>>>>>         new_files = self._get_new_files_if_any(element, tracker)
>>>>>         if self._process_new_files(tracker, watermark_estimater,
>>>>> new_files):
>>>>>             # return [new_file[0] for new_file in new_files] #<-- it
>>>>> doesn't get stuck but the SDF finishes
>>>>>             for new_file in new_files: #<--- it gets stuck if
>>>>> yielding file objects
>>>>>                 yield new_file[0]
>>>>>         else:
>>>>>             return
>>>>>         tracker.defer_remainder(Duration.of(self.POLL_TIMEOUT))
>>>>>
>>>>>     def _get_new_files_if_any(
>>>>>         self, element: str, tracker: DirectoryWatchRestrictionTracker
>>>>>     ) -> typing.List[typing.Tuple[MyFile, Timestamp]]:
>>>>>         new_files = []
>>>>>         for file in os.listdir(element):
>>>>>             if (
>>>>>                 os.path.isfile(os.path.join(element, file))
>>>>>                 and file not in tracker.current_restriction().
>>>>> already_processed
>>>>>             ):
>>>>>                 num_lines = sum(1 for _ in open(os.path.join(element,
>>>>> file)))
>>>>>                 new_file = MyFile(file, 0, num_lines)
>>>>>                 print(new_file)
>>>>>                 new_files.append(
>>>>>                     (
>>>>>                         new_file,
>>>>>                         Timestamp.of(os.path.getmtime(os.path.join(
>>>>> element, file))),
>>>>>                     )
>>>>>                 )
>>>>>         return new_files
>>>>>
>>>>>     def _process_new_files(
>>>>>         self,
>>>>>         tracker: DirectoryWatchRestrictionTracker,
>>>>>         watermark_estimater: ManualWatermarkEstimator,
>>>>>         new_files: typing.List[typing.Tuple[MyFile, Timestamp]],
>>>>>     ):
>>>>>         max_instance = watermark_estimater.current_watermark()
>>>>>         for new_file in new_files:
>>>>>             if tracker.try_claim(new_file[0].name) is False:
>>>>>                 watermark_estimater.set_watermark(max_instance)
>>>>>                 return False
>>>>>             if max_instance < new_file[1]:
>>>>>                 max_instance = new_file[1]
>>>>>         watermark_estimater.set_watermark(max_instance)
>>>>>         return max_instance < MAX_TIMESTAMP
>>>>>
>>>>

Reply via email to