Re: PubSub MultiReader
I made the modification to fuse topics and subscriptions, I now think it may be cleaner. Attached a file with both versions. Let me know what you think :D Thanks On Wed, Sep 9, 2020 at 5:35 PM Iñigo San Jose wrote: > > Hi everyone! > > I have seen a very common use case in Beam which is pipelines that read > from multiple PubSub topics or multiple subscriptions to end up flattening > them. In general, this makes the pipeline harder to understand since not > much context can be taken from it. > > I was thinking of adding a PTransform that reads from a list of > Topics/Subscriptions with a simple Extend to the actual ReadFromPubSub > Transform. This approach would help both developing and debugging the > pipelines: > >- No time spent developing a multi reader >- Easier organization of Topics/Subcriptions >- Pipeline graph easier to the eye and less convoluted >- Faster debugging >- Avoid issues when pipelines are too wide and hide other parts of the >pipeline > > As mentioned, I only have the PTransform based on an Extend, but in the > future implementing this with Splittable DoFn would be the way to go. The > PTransform takes 3 optional parameters: > >- topics: list of topics >- subscriptions: list of subscriptions >- with_context: boolean. If True it adds the topic/subscription name >to the message, so it becomes a tuple of (topic/subs name, message). This >could be helpful for future aggregations. The name may need to change. > > The parameters `topics` and `subscriptions` may be fused in a single > parameter and use the path [1] to know if it's a topic or a subscription. > But I consider it cleaner this way. > > Please find attached the current class I made as well as some screenshots > of how the pipeline looks. > > Since I don't know much about SplittableDoFns yet, I was considering > making a Pull Request for this PTransform and, on the meanwhile, work on a > SplittableDoFn version. > > Thanks a lot for your time, let me know what you think > Iñigo > > [1] projects//subscriptions/ > projects//topics/ > -- *•** Iñigo San Jose | josein...@google.coom* *•** Big Data *Technical Solutions Engineer *• *Google Cloud Platform • *Google, Dublin* from apache_beam.io.gcp.pubsub import ReadFromPubSub from apache_beam import Flatten, Map, PTransform class PubSubMultipleReader(PTransform): def __init__(self, topics=[], subscriptions=[], with_context=False): self.topics = topics self.subscriptions = subscriptions self.with_context = with_context def expand(self, pcol): topics_subscriptions_pcol = [] for topic in self.topics: topic_split = topic.split('/') topic_project = topic_split[1] topic_name = topic_split[-1] current_topic = ( pcol | f'PubSub Topics/project:{topic_project}/Read {topic_name}' >> ReadFromPubSub( topic=topic) ) if self.with_context: name = f'PubSub Topics/project:{topic_project}/Add Keys {topic_name}' current_topic = current_topic | name >> Map( lambda x: (topic, x)) topics_subscriptions_pcol.append(current_topic) for subscription in self.subscriptions: subscription_split = subscription.split('/') subscription_project = subscription_split[1] subscription_name = subscription_split[-1] current_subscription = ( pcol | f'PubSub Subscriptions/project:{subscription_project}/Read {subscription_name}' >> ReadFromPubSub( subscription=subscription) ) if self.with_context: name = f"PubSub Subscriptions/project:{subscription_project}/Add Keys {subscription_name}" current_subscription = current_subscription | name >> Map( lambda x: (subscription, x)) topics_subscriptions_pcol.append(current_subscription) return tuple(topics_subscriptions_pcol) | Flatten() class PubSubMultipleReaderV2(PTransform): def __init__(self, source_list=[], with_context=False): self.source_list = source_list self.with_context = with_context def expand(self, pcol): sources_pcol = [] for source in self.source_list: source_split = source.split('/') source_project = source_split[1] source_type = source_split[2] source_name = source_split[-1] step_name_base = f'PubSub {source_type}/project:{source_project}' if source_type == 'topics': current_source = ( pcol | f'{step_name_base}/Read {source_name}' >> ReadFromPubSub( topic=source)
PubSub MultiReader
Hi everyone! I have seen a very common use case in Beam which is pipelines that read from multiple PubSub topics or multiple subscriptions to end up flattening them. In general, this makes the pipeline harder to understand since not much context can be taken from it. I was thinking of adding a PTransform that reads from a list of Topics/Subscriptions with a simple Extend to the actual ReadFromPubSub Transform. This approach would help both developing and debugging the pipelines: - No time spent developing a multi reader - Easier organization of Topics/Subcriptions - Pipeline graph easier to the eye and less convoluted - Faster debugging - Avoid issues when pipelines are too wide and hide other parts of the pipeline As mentioned, I only have the PTransform based on an Extend, but in the future implementing this with Splittable DoFn would be the way to go. The PTransform takes 3 optional parameters: - topics: list of topics - subscriptions: list of subscriptions - with_context: boolean. If True it adds the topic/subscription name to the message, so it becomes a tuple of (topic/subs name, message). This could be helpful for future aggregations. The name may need to change. The parameters `topics` and `subscriptions` may be fused in a single parameter and use the path [1] to know if it's a topic or a subscription. But I consider it cleaner this way. Please find attached the current class I made as well as some screenshots of how the pipeline looks. Since I don't know much about SplittableDoFns yet, I was considering making a Pull Request for this PTransform and, on the meanwhile, work on a SplittableDoFn version. Thanks a lot for your time, let me know what you think Iñigo [1] projects//subscriptions/ projects//topics/ from apache_beam.io.gcp.pubsub import ReadFromPubSub from apache_beam import Flatten, Map, PTransform class PubSubMultipleReader(PTransform): def __init__(self, topics=[], subscriptions=[], with_context=False): self.topics = topics self.subscriptions = subscriptions self.with_context = with_context def expand(self, pcol): topics_subscriptions_pcol = [] for topic in self.topics: topic_split = topic.split('/') topic_project = topic_split[1] topic_name = topic_split[-1] current_topic = ( pcol | f'PubSub Topics/project:{topic_project}/Read {topic_name}' >> ReadFromPubSub( topic=topic) ) if self.with_context: name = f'PubSub Topics/project:{topic_project}/Add Keys {topic_name}' current_topic = current_topic | name >> Map( lambda x: (topic, x)) topics_subscriptions_pcol.append(current_topic) for subscription in self.subscriptions: subscription_split = subscription.split('/') subscription_project = subscription_split[1] subscription_name = subscription_split[-1] current_subscription = ( pcol | f'PubSub Subscriptions/project:{subscription_project}/Read {subscription_name}' >> ReadFromPubSub( subscription=subscription) ) if self.with_context: name = f"PubSub Subscriptions/project:{subscription_project}/Add Keys {subscription_name}" current_subscription = current_subscription | name >> Map( lambda x: (subscription, x)) topics_subscriptions_pcol.append(current_subscription) return tuple(topics_subscriptions_pcol) | Flatten()
[PROPOSAL] Mean Fn without_defaults()
Hi all! Some time ago I was tinkering with A. Beam and I encountered the typical error when using Global Combiners and windows (the need of without_defaults). I was actually using the Mean function and I noticed that I couldn't use the without_defaults() option and I needed to use "CombineGlobally(MeanCombineFn()).without_defaults()". I checked the Python code [1] and I made a "fix" to be able to use Mean.Globally().without_defaults() without running into errors. Please find attached both the quick tests I ran and the code itself of the file [1] modified. I was considering making a pull request to Apache Beam, but I would like to discuss it with someone before in case there is a reason for not using `without_defaults()` there. I would highly appreciate your feedback on this. Thanks a lot, Iñigo PS: I also tried the same approach with Count, but I couldn't make it work (I think because of the accumulators) PS2: This was made in A. Beam 2.20.0, but should also work in 2.21.0 [1] https://beam.apache.org/releases/pydoc/2.20.0/_modules/apache_beam/transforms/combiners.html#Mean <https://beam.apache.org/releases/pydoc/2.17.0/_modules/apache_beam/transforms/combiners.html#Mean> -- *•** Iñigo San Jose | josein...@google.coom* *•** Big Data *Technical Solutions Engineer T2 *• *Google Cloud Platform • *Google, Dublin* import logging import random import time import datetime import apache_beam as beam from apache_beam.io import ReadFromText from apache_beam import Create, FlatMap, Map, ParDo, Filter, Flatten, Partition from apache_beam import Keys, Values, GroupByKey, CoGroupByKey, CombineGlobally, CombinePerKey from apache_beam import pvalue, window, WindowInto from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions from apache_beam.transforms.util import WithKeys from apache_beam.transforms.combiners import Top, Mean, Count, MeanCombineFn, CountCombineFn with beam.Pipeline() as p: scores = [ {"player": "Juan", "score": 1000, "timestamp": "2020-04-30 15:35"}, {"player": "Marina", "score": 1500, "timestamp": "2020-04-30 16:10"}, {"player": "Cristina", "score": 2000, "timestamp": "2020-04-30 15:00"}, {"player": "Cristina", "score": 3500, "timestamp": "2020-04-30 15:45"}, {"player": "Marina", "score": 500, "timestamp": "2020-04-30 16:30"}, {"player": "Juan", "score": 4000, "timestamp": "2020-04-30 15:15"}, {"player": "Cristina", "score": 3000, "timestamp": "2020-04-30 16:50"}, {"player": "Juan", "score": 2000, "timestamp": "2020-04-30 16:59"}, ] def date2unix(string): unix = int(time.mktime(datetime.datetime.strptime(string, "%Y-%m-%d %H:%M").timetuple())) return unix def toKV(element): return (element['player'], element['score']) create = (p | "Create" >> Create(scores) | "Add timestamps" >> Map(lambda x: window.TimestampedValue(x, date2unix(x['timestamp']))) ) mean_pk = (create | "To KV" >> Map(toKV) | "FixedWindow" >> WindowInto(window.FixedWindows(60 * 60)) | "Mean Per Key" >> Mean.PerKey() | Map(lambda x: print("Mean per Player: {} ".format(x))) ) add_score = (create | "Get Score" >> Map(lambda x: x['score'])) add_window = add_score | "SlidingWindow" >> WindowInto(window.SlidingWindows(60 * 60, 60 * 20)) (add_score | "Modified No window" >> Mean.Globally() | "Print avg no window" >> Map(lambda x: print("No Window Average: {} ".format(x (add_window | "Original" >> CombineGlobally(MeanCombineFn()).without_defaults() | "Print avg og" >> Map(lambda x: print("Original Average: {} ".format(x (add_window | "Modified" >> Mean.Globally().without_defaults() | "Print avg modified" >> Map(lambda x: print("Modified Average: {} ".format(x # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at #