OK, as I read it get_responses yields several elements for a single
input message. It sounds like you want to defer writing to PubSub
until after all of them are processed. The easiest way to do this
would be for get_responses to return a list of messages and
send_to_output to process the whole list of messages, returning the
single "done" signal which could then get written.

You could also do something by tagging the various outputs of
get_responses with a common key, and then doing a (windowed)
group-by-key to colocate them again, but that might be overkill in
this case.

On Thu, Sep 19, 2019 at 2:54 PM Anjana Pydi <anjan...@bahwancybertek.com> wrote:
>
> Hi Robert,
>
> Thanks for the reply. 'get_responses' method return a list of dictionaries 
> and 'send_to_output' method takes each element of the list and simultaneously 
> posts it to an API end point.
>
> My question is I just need to send a signal(write a message 'process 
> completed' ) to pubsub topic2 only after completion of send_to_output step 
> (i.e. after processing entire list from before step).
>
> If I add it after last step like below, I suspect it would write message for 
> every element of the list. Is there any way that I can do it only once at the 
> end?
>
> beam.Map(lambda output: send_to_output(output)) | 'process completed' >> 
> beam.io.WriteStringsToPubSub(topic=known_args.output_topic)
>
> Regards,
> Anjana
> ________________________________________
> From: Robert Bradshaw [rober...@google.com]
> Sent: Thursday, September 19, 2019 2:31 PM
> To: user
> Cc: Richard Amrith Lourdu
> Subject: Re: Publish message to pubsub topic after processing current input 
> in beam streaming pipeline
>
> On Thu, Sep 19, 2019 at 11:05 AM Anjana Pydi
> <anjan...@bahwancybertek.com> wrote:
> >
> > Hi ,
> >
> > I have a beam streaming pipeline which reads data from pubsub topic, use it 
> > to call an API and get responses, apply some transformations on the 
> > obtained responses and writes to output sinks.
> >
> > Now, I need to add logic to write a 'process completed' message to another 
> > pubsub topic once after the process gets finished. Can some one please 
> > provide your thoughts on how can I add it.
> >
> > I actually want to achieve this:
> >
> > topic1 (data)-> triggers pipeline and writes complete message at end-> 
> > topic2 (complete msg)
>
> Your code below looks fine so far. I'm assuming your send_to_output
> function produces the message that you want to send to topic2, right?
> (BTW, you can write beam.Map(send_to_topic) rather than having to
> write beam.Map(lambda output: send_to_output(output)).)
>
> In that case, you just need to add
>
>     data | beam.io.WriteStringsToPubSub(topic2)
>
> > When topic1 sees message on topic2, it again posts new data to topic1
> >
> > Below is the code sample:
> >
> >   pubsub_message = p | 'Read From Pubsub' >> 
> > beam.io.ReadStringsFromPubSub(topic=known_args.input_topic) | 'split and 
> > add' >> beam.ParDo(split_item)
> >
> >   data = pubsub_message | 'API call' >> beam.FlatMap(lambda x: 
> > get_responses(x[0],
> >                 datetime.strptime(x[1], "%Y-%m-%d %H:%M:%S"),
> >                 datetime.strptime(x[2], "%Y-%m-%d %H:%M:%S"))) | 
> > 'WriteOutput' >> beam.Map(lambda output: send_to_output(output))
> >
> > Thanks,
> > Anjana
> >
> >
> > -----------------------------------------------------------------------------------------------------------------------
> >  The information contained in this communication is intended solely for the 
> > use of the individual or entity to whom it is addressed and others 
> > authorized to receive it. It may contain confidential or legally privileged 
> > information. If you are not the intended recipient you are hereby notified 
> > that any disclosure, copying, distribution or taking any action in reliance 
> > on the contents of this information is strictly prohibited and may be 
> > unlawful. If you are not the intended recipient, please notify us 
> > immediately by responding to this email and then delete it from your 
> > system. Bahwan Cybertek is neither liable for the proper and complete 
> > transmission of the information contained in this communication nor for any 
> > delay in its receipt.
> -----------------------------------------------------------------------------------------------------------------------
>  The information contained in this communication is intended solely for the 
> use of the individual or entity to whom it is addressed and others authorized 
> to receive it. It may contain confidential or legally privileged information. 
> If you are not the intended recipient you are hereby notified that any 
> disclosure, copying, distribution or taking any action in reliance on the 
> contents of this information is strictly prohibited and may be unlawful. If 
> you are not the intended recipient, please notify us immediately by 
> responding to this email and then delete it from your system. Bahwan Cybertek 
> is neither liable for the proper and complete transmission of the information 
> contained in this communication nor for any delay in its receipt.

Reply via email to