Re: PubSub MultiReader

2020-09-09 Thread Iñigo San Jose
I made the modification to fuse topics and subscriptions, I now think it
may be cleaner. Attached a file with both versions.

Let me know what you think :D

Thanks

On Wed, Sep 9, 2020 at 5:35 PM Iñigo San Jose  wrote:

>
> Hi everyone!
>
> I have seen a very common use case in Beam which is pipelines that read
> from multiple PubSub topics or multiple subscriptions to end up flattening
> them. In general, this makes the pipeline harder to understand since not
> much context can be taken from it.
>
> I was thinking of adding a PTransform that reads from a list of
> Topics/Subscriptions with a simple Extend to the actual ReadFromPubSub
> Transform. This approach would help both developing and debugging the
> pipelines:
>
>- No time spent developing a multi reader
>- Easier organization of Topics/Subcriptions
>- Pipeline graph easier to the eye and less convoluted
>- Faster debugging
>- Avoid issues when pipelines are too wide and hide other parts of the
>pipeline
>
> As mentioned, I only have the PTransform based on an Extend, but in the
> future implementing this with Splittable DoFn would be the way to go. The
> PTransform takes 3 optional parameters:
>
>- topics: list of topics
>- subscriptions: list of subscriptions
>- with_context: boolean. If True it adds the topic/subscription name
>to the message, so it becomes a tuple of (topic/subs name, message). This
>could be helpful for future aggregations. The name may need to change.
>
> The parameters `topics` and `subscriptions` may be fused in a single
> parameter and use the path [1] to know if it's a topic or a subscription.
> But I consider it cleaner this way.
>
> Please find attached the current class I made as well as some screenshots
> of how the pipeline looks.
>
> Since I don't know much about SplittableDoFns yet, I was considering
> making a Pull Request for this PTransform and, on the meanwhile, work on a
> SplittableDoFn version.
>
> Thanks a lot for your time, let me know what you think
> Iñigo
>
> [1] projects//subscriptions/
> projects//topics/
>


-- 

*•** Iñigo San Jose | josein...@google.coom*
*•** Big Data *Technical Solutions Engineer
*• *Google Cloud Platform
• *Google, Dublin*
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam import Flatten, Map, PTransform


class PubSubMultipleReader(PTransform):
def __init__(self, topics=[], subscriptions=[], with_context=False):
self.topics = topics
self.subscriptions = subscriptions
self.with_context = with_context

def expand(self, pcol):
topics_subscriptions_pcol = []
for topic in self.topics:
topic_split = topic.split('/')
topic_project = topic_split[1]
topic_name = topic_split[-1]
current_topic = (
pcol | f'PubSub Topics/project:{topic_project}/Read {topic_name}' >> ReadFromPubSub(
topic=topic)
)
if self.with_context:
name = f'PubSub Topics/project:{topic_project}/Add Keys {topic_name}'
current_topic = current_topic | name >> Map(
lambda x: (topic, x))

topics_subscriptions_pcol.append(current_topic)

for subscription in self.subscriptions:
subscription_split = subscription.split('/')
subscription_project = subscription_split[1]
subscription_name = subscription_split[-1]
current_subscription = (
pcol | f'PubSub Subscriptions/project:{subscription_project}/Read {subscription_name}' >> ReadFromPubSub(
subscription=subscription)
)
if self.with_context:
name = f"PubSub Subscriptions/project:{subscription_project}/Add Keys {subscription_name}"
current_subscription = current_subscription | name >> Map(
lambda x: (subscription, x))

topics_subscriptions_pcol.append(current_subscription)

return tuple(topics_subscriptions_pcol) | Flatten()


class PubSubMultipleReaderV2(PTransform):
def __init__(self, source_list=[], with_context=False):
self.source_list = source_list
self.with_context = with_context

def expand(self, pcol):
sources_pcol = []
for source in self.source_list:
source_split = source.split('/')
source_project = source_split[1]
source_type = source_split[2]
source_name = source_split[-1]

step_name_base = f'PubSub {source_type}/project:{source_project}'

if source_type == 'topics':
current_source = (
pcol | f'{step_name_base}/Read {source_name}' >> ReadFromPubSub(
topic=source)
  

PubSub MultiReader

2020-09-09 Thread Iñigo San Jose
Hi everyone!

I have seen a very common use case in Beam which is pipelines that read
from multiple PubSub topics or multiple subscriptions to end up flattening
them. In general, this makes the pipeline harder to understand since not
much context can be taken from it.

I was thinking of adding a PTransform that reads from a list of
Topics/Subscriptions with a simple Extend to the actual ReadFromPubSub
Transform. This approach would help both developing and debugging the
pipelines:

   - No time spent developing a multi reader
   - Easier organization of Topics/Subcriptions
   - Pipeline graph easier to the eye and less convoluted
   - Faster debugging
   - Avoid issues when pipelines are too wide and hide other parts of the
   pipeline

As mentioned, I only have the PTransform based on an Extend, but in the
future implementing this with Splittable DoFn would be the way to go. The
PTransform takes 3 optional parameters:

   - topics: list of topics
   - subscriptions: list of subscriptions
   - with_context: boolean. If True it adds the topic/subscription name to
   the message, so it becomes a tuple of (topic/subs name, message). This
   could be helpful for future aggregations. The name may need to change.

The parameters `topics` and `subscriptions` may be fused in a single
parameter and use the path [1] to know if it's a topic or a subscription.
But I consider it cleaner this way.

Please find attached the current class I made as well as some screenshots
of how the pipeline looks.

Since I don't know much about SplittableDoFns yet, I was considering making
a Pull Request for this PTransform and, on the meanwhile, work on a
SplittableDoFn version.

Thanks a lot for your time, let me know what you think
Iñigo

[1] projects//subscriptions/
projects//topics/
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam import Flatten, Map, PTransform

class PubSubMultipleReader(PTransform):
def __init__(self, topics=[], subscriptions=[], with_context=False):
self.topics = topics
self.subscriptions = subscriptions
self.with_context = with_context

def expand(self, pcol):
topics_subscriptions_pcol = []
for topic in self.topics:
topic_split = topic.split('/')
topic_project = topic_split[1]
topic_name = topic_split[-1]
current_topic = (
pcol | f'PubSub Topics/project:{topic_project}/Read {topic_name}' >> ReadFromPubSub(
topic=topic)
)
if self.with_context:
name = f'PubSub Topics/project:{topic_project}/Add Keys {topic_name}'
current_topic = current_topic | name >> Map(
lambda x: (topic, x))

topics_subscriptions_pcol.append(current_topic)

for subscription in self.subscriptions:
subscription_split = subscription.split('/')
subscription_project = subscription_split[1]
subscription_name = subscription_split[-1]
current_subscription = (
pcol | f'PubSub Subscriptions/project:{subscription_project}/Read {subscription_name}' >> ReadFromPubSub(
subscription=subscription)
)
if self.with_context:
name = f"PubSub Subscriptions/project:{subscription_project}/Add Keys {subscription_name}"
current_subscription = current_subscription | name >> Map(
lambda x: (subscription, x))

topics_subscriptions_pcol.append(current_subscription)

return tuple(topics_subscriptions_pcol) | Flatten()

[PROPOSAL] Mean Fn without_defaults()

2020-06-04 Thread Iñigo San Jose
Hi all!

Some time ago I was tinkering with A. Beam and I encountered the
typical error when using Global Combiners and windows (the need of
without_defaults). I was actually using the Mean function and I noticed
that I couldn't use the without_defaults() option and I needed to use
"CombineGlobally(MeanCombineFn()).without_defaults()".

I checked the Python code [1] and I made a "fix" to be able to use
Mean.Globally().without_defaults() without running into errors. Please find
attached both the quick tests I ran and the code itself of the file [1]
modified.

I was considering making a pull request to Apache Beam, but I would like to
discuss it with someone before in case there is a reason for not using
`without_defaults()`  there. I would highly appreciate your feedback on
this.

Thanks a lot,
Iñigo

PS: I also tried the same approach with Count, but I couldn't make it work
(I think because of the accumulators)

PS2: This was made in A. Beam 2.20.0, but should also work in 2.21.0


[1]
https://beam.apache.org/releases/pydoc/2.20.0/_modules/apache_beam/transforms/combiners.html#Mean
<https://beam.apache.org/releases/pydoc/2.17.0/_modules/apache_beam/transforms/combiners.html#Mean>

-- 

*•** Iñigo San Jose | josein...@google.coom*
*•** Big Data *Technical Solutions Engineer T2
*• *Google Cloud Platform
• *Google, Dublin*
import logging
import random
import time
import datetime


import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam import Create, FlatMap, Map, ParDo, Filter, Flatten, Partition
from apache_beam import Keys, Values, GroupByKey, CoGroupByKey, CombineGlobally, CombinePerKey
from apache_beam import pvalue, window, WindowInto
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from apache_beam.transforms.util import WithKeys
from apache_beam.transforms.combiners import Top, Mean, Count, MeanCombineFn, CountCombineFn


with beam.Pipeline() as p:
scores = [
{"player": "Juan", "score": 1000, "timestamp": "2020-04-30 15:35"},
{"player": "Marina", "score": 1500, "timestamp": "2020-04-30 16:10"},
{"player": "Cristina", "score": 2000, "timestamp": "2020-04-30 15:00"},
{"player": "Cristina", "score": 3500, "timestamp": "2020-04-30 15:45"},
{"player": "Marina", "score": 500, "timestamp": "2020-04-30 16:30"},
{"player": "Juan", "score": 4000, "timestamp": "2020-04-30 15:15"},
{"player": "Cristina", "score": 3000, "timestamp": "2020-04-30 16:50"},
{"player": "Juan", "score": 2000, "timestamp": "2020-04-30 16:59"},
]


def date2unix(string):
unix = int(time.mktime(datetime.datetime.strptime(string, "%Y-%m-%d %H:%M").timetuple()))
return unix


def toKV(element):
return (element['player'], element['score'])


create = (p | "Create" >> Create(scores)
  | "Add timestamps" >> Map(lambda x: window.TimestampedValue(x, date2unix(x['timestamp'])))
  )
mean_pk = (create | "To KV" >> Map(toKV)
 | "FixedWindow" >> WindowInto(window.FixedWindows(60 * 60))
 | "Mean Per Key" >> Mean.PerKey()
 | Map(lambda x: print("Mean per Player: {} ".format(x)))
 )

add_score = (create | "Get Score" >> Map(lambda x: x['score']))

add_window = add_score | "SlidingWindow" >> WindowInto(window.SlidingWindows(60 * 60, 60 * 20))

(add_score | "Modified No window" >> Mean.Globally()
  | "Print avg no window" >> Map(lambda x: print("No Window Average: {} ".format(x

(add_window | "Original" >> CombineGlobally(MeanCombineFn()).without_defaults()
  | "Print avg og" >> Map(lambda x: print("Original Average: {} ".format(x

(add_window | "Modified" >> Mean.Globally().without_defaults()
  | "Print avg modified" >> Map(lambda x: print("Modified Average: {} ".format(x
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#