The Python Local Runner has limited support for streaming pipelines.
For the time being would recommend using Dataflow or Flink (the latter
can be run locally) to try out streaming pipelines.

On Fri, Mar 8, 2024 at 2:11 PM Puertos tavares, Jose J (Canada) via
user <user@beam.apache.org> wrote:
>
> Hello  Hu:
>
>
>
> Not really.  This one as you have coded it finishes as per   
> stop_timestamp=time.time() + 16 and after it finish emitting then everything 
> else gets output and the pipeline in batch mode terminates.
>
>
>
> You can rule out STDOUT issues and  confirm this behavior as putting a ParDo  
> with something that would throw an exception after the GroupBy  or write 
> temporary files/make HTTP requests. This ParDO won’t be executed until your 
> PeriodImpulse terminates (you can extend it to +60  and see this is not being 
> trigger on your 4 second window, but until it stops generating)
>
>
>
> I am looking for something that is really streaming and executes constantly 
> and that in this case , every 4 seconds the window would process the elements 
> in the window and wait for the next window to accumulate.
>
>
>
> Regards,
>
> JP
>
>
>
>
>
>
>
>
>
>
> INTERNAL USE
>
> From: XQ Hu <x...@google.com>
> Sent: Friday, March 8, 2024 3:51 PM
> To: user@beam.apache.org
> Cc: Puertos tavares, Jose J (Canada) <jose_j_puertos_tava...@homedepot.com>
> Subject: [EXTERNAL] Re: [Question] Python Streaming Pipeline Support
>
>
>
> Is this what you are looking for? import random import time import 
> apache_beam as beam from apache_beam. transforms import trigger, window from 
> apache_beam. transforms. periodicsequence import PeriodicImpulse from 
> apache_beam. utils. timestamp import
>
> Is this what you are looking for?
>
>
>
> import random
> import time
>
> import apache_beam as beam
> from apache_beam.transforms import trigger, window
> from apache_beam.transforms.periodicsequence import PeriodicImpulse
> from apache_beam.utils.timestamp import Timestamp
>
> with beam.Pipeline() as p:
>     input = (
>         p
>         | PeriodicImpulse(
>             start_timestamp=time.time(),
>             stop_timestamp=time.time() + 16,
>             fire_interval=1,
>             apply_windowing=False,
>         )
>         | beam.Map(lambda x: random.random())
>         | beam.WindowInto(window.FixedWindows(4))
>         | beam.GroupBy()
>         | "Print Windows"
>         >> beam.transforms.util.LogElements(with_timestamp=True, 
> with_window=True)
>     )
>
>
>
> On Fri, Mar 8, 2024 at 6:48 AM Puertos tavares, Jose J (Canada) via user 
> <user@beam.apache.org> wrote:
>
> Hello Beam Users!
>
>
>
> I was looking into a simple example in Python to have an unbound (--streaming 
> flag ) pipeline that generated random numbers , applied a Fixed Window (let’s 
> say 5 seconds) and then applies a group by operation ( reshuffle) and print 
> the result just to check.
>
>
>
> I notice that this seems to work as long as there is no grouping operation 
> (reshuffle, groupBy ,etc. ) that would leverage the windowing semantics.
>
>
>
>     #Get Parameters from Command Line for the Pipeline
>
>     known_args, pipeline_options = parser.parse_known_args(argv)
>
>     pipeline_options = PipelineOptions(flags=argv)
>
>
>
>     #Create pipeline
>
>     p = beam.Pipeline(options=pipeline_options)
>
>
>
>
>
>     #Execute Pipeline
>
>     (p | "Start pipeline " >> beam.Create([0])
>
>     | "Get values"  >> beam.ParDo(RandomNumberGenerator())
>
>     | 'Applied fixed windows ' >> beam.WindowInto( window.FixedWindows(1*5) )
>
>     | 'Reshuffle ' >> beam.Reshuffle()
>
>     |  "Print" >> beam.Map(lambda x: print ("{} - {} ".format(os.getpid(), x) 
> ,flush=True ) )
>
>     )
>
>
>
>     result = p.run()
>
>     result.wait_until_finish()
>
>
>
>
>
> Even thought the  Random Generator is unbound and tagged as so with the 
> decorator, it seems to stuck, if I make that step finite (i.e. adding a 
> counter and exiting) then the code works in regular batch mode.
>
>
>
> # 
> =============================================================================
>
> # Class for Splittable Do  Random Generatered numbers
>
> # 
> =============================================================================
>
>
>
> @beam.transforms.core.DoFn.unbounded_per_element()
>
> class RandomNumberGenerator(beam.DoFn):
>
>
>
>     @beam.transforms.core.DoFn.unbounded_per_element()
>
>     def process(self, element ):
>
>         import random
>
>         import time
>
>
>
>         counter=0
>
>
>
>
>
>         while True:
>
>
>
>             #if counter>5:
>
>             #    break
>
>             nmb = random.randint(0, 1000)
>
>             wait = random.randint(0, 5)
>
>             rnow = time.time()
>
>
>
>
>
>             print("Randy random", nmb)
>
>
>
>             yield beam.window.TimestampedValue(nmb, rnow)
>
>             time.sleep(wait)
>
>             counter+=1
>
>
>
> I have tried to implement as per documentation the tracker and watermark, but 
> it seems that none of that seems to work either for the DirectRunner or 
> FlinkRunner  (even there where reshuffle is not a custom operation but a 
> vertex between the different ParDos). It seems to just stuck.
>
>
>
> I event tried using the native PeriodicImpusle [beam.apache.org] as to factor 
> out any of my implementation on it, however I still got the same result of it 
> being ‘stuck’ on the GroupBy/Reshuffle operation.
>
>
>
> In the past I have created with the Java SDK a Unbound Source (now obsoleted 
> it seems according to doc)   streaming pipelines, however I noticed that  
> most of the unbound python readers like Kakfa [beam.apache.org]  and PubSub 
> [beam.apache.org] use ExternalTransforms behind the scenes so I am starting 
> to wonder if such unbound sources are supported at all natively in Python.
>
>
>
> I have done some Internet search and even tried  LLMs to get to have a 
> suggestion but I don’t seem to be successful in getting a clear answer on how 
> to achieve this in Python or if this is even possible and after spending a 
> couple days I figure I could ask the beam team and hear your thoughts about 
> it and if you can reference me to any sample that might work so I can analyze 
> it forward to understand what is missing would be greatly appreciated.
>
>
>
>
>
>
>
> Regards,
>
> JP – A fellow Apache Beam enthusiast
>
>
>
> ________________________________
>
>
> The information in this Internet Email is confidential and may be legally 
> privileged. It is intended solely for the addressee. Access to this Email by 
> anyone else is unauthorized. If you are not the intended recipient, any 
> disclosure, copying, distribution or any action taken or omitted to be taken 
> in reliance on it, is prohibited and may be unlawful. When addressed to our 
> clients any opinions or advice contained in this Email are subject to the 
> terms and conditions expressed in any applicable governing The Home Depot 
> terms of business or client engagement letter. The Home Depot disclaims all 
> responsibility and liability for the accuracy and content of this attachment 
> and for any damages or losses arising from any inaccuracies, errors, viruses, 
> e.g., worms, trojan horses, etc., or other items of a destructive nature, 
> which may be contained in this attachment and shall not be liable for direct, 
> indirect, consequential or special damages in connection with this e-mail 
> message or its attachment.
>
> INTERNAL USE
>
>
> ________________________________
>
> The information in this Internet Email is confidential and may be legally 
> privileged. It is intended solely for the addressee. Access to this Email by 
> anyone else is unauthorized. If you are not the intended recipient, any 
> disclosure, copying, distribution or any action taken or omitted to be taken 
> in reliance on it, is prohibited and may be unlawful. When addressed to our 
> clients any opinions or advice contained in this Email are subject to the 
> terms and conditions expressed in any applicable governing The Home Depot 
> terms of business or client engagement letter. The Home Depot disclaims all 
> responsibility and liability for the accuracy and content of this attachment 
> and for any damages or losses arising from any inaccuracies, errors, viruses, 
> e.g., worms, trojan horses, etc., or other items of a destructive nature, 
> which may be contained in this attachment and shall not be liable for direct, 
> indirect, consequential or special damages in connection with this e-mail 
> message or its attachment.

Reply via email to