OK, as I read it get_responses yields several elements for a single input message. It sounds like you want to defer writing to PubSub until after all of them are processed. The easiest way to do this would be for get_responses to return a list of messages and send_to_output to process the whole list of messages, returning the single "done" signal which could then get written.
You could also do something by tagging the various outputs of get_responses with a common key, and then doing a (windowed) group-by-key to colocate them again, but that might be overkill in this case. On Thu, Sep 19, 2019 at 2:54 PM Anjana Pydi <anjan...@bahwancybertek.com> wrote: > > Hi Robert, > > Thanks for the reply. 'get_responses' method return a list of dictionaries > and 'send_to_output' method takes each element of the list and simultaneously > posts it to an API end point. > > My question is I just need to send a signal(write a message 'process > completed' ) to pubsub topic2 only after completion of send_to_output step > (i.e. after processing entire list from before step). > > If I add it after last step like below, I suspect it would write message for > every element of the list. Is there any way that I can do it only once at the > end? > > beam.Map(lambda output: send_to_output(output)) | 'process completed' >> > beam.io.WriteStringsToPubSub(topic=known_args.output_topic) > > Regards, > Anjana > ________________________________________ > From: Robert Bradshaw [rober...@google.com] > Sent: Thursday, September 19, 2019 2:31 PM > To: user > Cc: Richard Amrith Lourdu > Subject: Re: Publish message to pubsub topic after processing current input > in beam streaming pipeline > > On Thu, Sep 19, 2019 at 11:05 AM Anjana Pydi > <anjan...@bahwancybertek.com> wrote: > > > > Hi , > > > > I have a beam streaming pipeline which reads data from pubsub topic, use it > > to call an API and get responses, apply some transformations on the > > obtained responses and writes to output sinks. > > > > Now, I need to add logic to write a 'process completed' message to another > > pubsub topic once after the process gets finished. Can some one please > > provide your thoughts on how can I add it. > > > > I actually want to achieve this: > > > > topic1 (data)-> triggers pipeline and writes complete message at end-> > > topic2 (complete msg) > > Your code below looks fine so far. I'm assuming your send_to_output > function produces the message that you want to send to topic2, right? > (BTW, you can write beam.Map(send_to_topic) rather than having to > write beam.Map(lambda output: send_to_output(output)).) > > In that case, you just need to add > > data | beam.io.WriteStringsToPubSub(topic2) > > > When topic1 sees message on topic2, it again posts new data to topic1 > > > > Below is the code sample: > > > > pubsub_message = p | 'Read From Pubsub' >> > > beam.io.ReadStringsFromPubSub(topic=known_args.input_topic) | 'split and > > add' >> beam.ParDo(split_item) > > > > data = pubsub_message | 'API call' >> beam.FlatMap(lambda x: > > get_responses(x[0], > > datetime.strptime(x[1], "%Y-%m-%d %H:%M:%S"), > > datetime.strptime(x[2], "%Y-%m-%d %H:%M:%S"))) | > > 'WriteOutput' >> beam.Map(lambda output: send_to_output(output)) > > > > Thanks, > > Anjana > > > > > > ----------------------------------------------------------------------------------------------------------------------- > > The information contained in this communication is intended solely for the > > use of the individual or entity to whom it is addressed and others > > authorized to receive it. It may contain confidential or legally privileged > > information. If you are not the intended recipient you are hereby notified > > that any disclosure, copying, distribution or taking any action in reliance > > on the contents of this information is strictly prohibited and may be > > unlawful. If you are not the intended recipient, please notify us > > immediately by responding to this email and then delete it from your > > system. Bahwan Cybertek is neither liable for the proper and complete > > transmission of the information contained in this communication nor for any > > delay in its receipt. > ----------------------------------------------------------------------------------------------------------------------- > The information contained in this communication is intended solely for the > use of the individual or entity to whom it is addressed and others authorized > to receive it. It may contain confidential or legally privileged information. > If you are not the intended recipient you are hereby notified that any > disclosure, copying, distribution or taking any action in reliance on the > contents of this information is strictly prohibited and may be unlawful. If > you are not the intended recipient, please notify us immediately by > responding to this email and then delete it from your system. Bahwan Cybertek > is neither liable for the proper and complete transmission of the information > contained in this communication nor for any delay in its receipt.