Ensuring a task does not get executed concurrently

2023-06-12 Thread Stephan Hoyer via dev
Can the Beam data model (specifically the Python SDK) support executing
functions that are idempotent but not concurrency-safe?

I am thinking of a task like setting up a database (or in my case, a Zarr
 store in Xarray-Beam
) where it is not safe to run setup
concurrently, but if the whole operation fails it is safe to retry.

I recognize that a better model would be to use entirely atomic operations,
but sometimes this can be challenging to guarantee for tools that were not
designed with parallel computing in mind.

Cheers,
Stephan


Re: Hierarchical fanout with Beam combiners?

2023-05-27 Thread Stephan Hoyer via dev
On Fri, May 26, 2023 at 2:59 PM Robert Bradshaw  wrote:

> Yes, with_hot_key_fanout only performs a single level of fanout. I don't
> think fanning out more than this has been explored, but I would imagine
> that for most cases the increased IO would negate most if not all of the
> benefits.
>

My reasoning for multi-level fanout would be that the total amount of IO is
that it converges as a geometric series: at each level, the amount of data
is reduced by a factor of 1/fanout. So even if fanout=2 at each level, the
total amount of IO is twice the IO of not using fanout at all. The general
IO overhead would be a factor of "fanout / (fanout - 1)".


> In particular, note that we already do "combiner lifting" to do as much
> combining as we can on the mapper side, e.g. suppose we have M elements and
> N workers. Each worker will (to a first order of approximation) combine M/N
> elements down to a single element, leaving N elements total to be combined
> by a worker in the subsequent stage. If N is large (or the combining
> computation expensive) one can use with_hot_key_fanout to add an
> intermediate step and let the N workers each combine M/N elements into
> sqrt(N) partial aggregates, and the subsequent worker only needs to combine
> the sqrt(N) partial aggregates. Generally N (the number of workers, not the
> number of elements) is small enough that multiple levels are not needed.
>

Thanks for clarifying Robert. I did not realize that "combiner lifting" was
a thing! We had been operating under the assumption that we should use
fanout to sqrt(M), which could indeed be bigger than sqrt(N). In general
"with_hot_key_fanout" could use documentation to explain exactly what the
parameter means and to indicate suggested usage (e.g., set it to sqrt(N)).

I will say that one other concern for us is memory usage. We typically work
with large, image-like data coming out of weather simulations, which we try
to divide into 10-100 MB chunks. With 1000 workers, this would suggest
fanout=30, which means combining up to 3 GB of data on a single machine.
This is probably fine but doesn't leave a large machine for error.


>  On Fri, May 26, 2023 at 1:57 PM Stephan Hoyer via dev <
> dev@beam.apache.org> wrote:
>
>> We have some use-cases where we are combining over very large sets (e.g.,
>> computing the average of 1e5 to 1e6 elements, corresponding to hourly
>> weather observations over the past 50 years).
>>
>> "with_hot_key_fanout" seems to be rather essential for performing these
>> calculations, but as far as I can tell it only performs a single level of
>> fanout, i.e., instead of summing up 1e6 elements on a single node, you sum
>> 1e3 element 1e3 times, and then sum those 1e3 results together.
>>
>> My guess is that such combiners could be much more efficient if this was
>> performed in a hierarchical/multi-stage fashion proportional to
>> log(element_count), e.g., summing 100 elements with 3 stages, or maybe
>> summing 10 elements with 6 stages. Dask uses such a "tree reduction"
>> strategy as controlled by the "split_every" parameter:
>> https://github.com/dask/dask/blob/453bd7031828f72e4602498c1a1f776280794bea/dask/array/reductions.py#L109
>>
>> I understand that the number of fanout stages could not be computed
>> automatically in the Beam data model, but it would still be nice to be able
>> to specify this manually. Has there been any thought to introducing this
>> feature?
>>
>> Thanks,
>> Stephan
>>
>


Hierarchical fanout with Beam combiners?

2023-05-26 Thread Stephan Hoyer via dev
We have some use-cases where we are combining over very large sets (e.g.,
computing the average of 1e5 to 1e6 elements, corresponding to hourly
weather observations over the past 50 years).

"with_hot_key_fanout" seems to be rather essential for performing these
calculations, but as far as I can tell it only performs a single level of
fanout, i.e., instead of summing up 1e6 elements on a single node, you sum
1e3 element 1e3 times, and then sum those 1e3 results together.

My guess is that such combiners could be much more efficient if this was
performed in a hierarchical/multi-stage fashion proportional to
log(element_count), e.g., summing 100 elements with 3 stages, or maybe
summing 10 elements with 6 stages. Dask uses such a "tree reduction"
strategy as controlled by the "split_every" parameter:
https://github.com/dask/dask/blob/453bd7031828f72e4602498c1a1f776280794bea/dask/array/reductions.py#L109

I understand that the number of fanout stages could not be computed
automatically in the Beam data model, but it would still be nice to be able
to specify this manually. Has there been any thought to introducing this
feature?

Thanks,
Stephan


beam.Create(range(N)) without building a sequence in memory

2022-09-19 Thread Stephan Hoyer via dev
Many of my Beam pipelines start with partitioning over some large,
statically known number of inputs that could be created from a list of
sequential integers.

In Python, these sequential integers can be efficiently represented with a
range() object, which stores the start/top and interval. However,
beam.Create() always loads its arguments into memory, as a Python tuple. I
suspect this will start to become problematic for really big ranges (e.g.,
in the tens of millions).

I am thinking that it would make sense to either add a special optimization
to beam.Create() to recognize range() objects, or to add a dedicated
transform like beam.Range() for creating a range of inputs.

Any thoughts?

Cheers,
Stephan

P.S. I initially looked into writing this as a trivial case for trying out
the Splittable DoFn API, but despite reading the docs several times could
not figure out how that is supposed to work. A self-contained and complete
example would really help for making that API accessible to new users.


Re: Cartesian product of PCollections

2022-09-19 Thread Stephan Hoyer via dev
>
> > > My team has an internal implementation of a CartesianProduct
> transform, based on using hashing to split a pcollection into a finite
> number of groups and CoGroupByKey.
> >
> > Could this be contributed to Beam?
>

If it would be of broader interest, I would be happy to work on this for
the Python SDK.

I can share a link to the code with Googlers.

On Mon, Sep 19, 2022 at 10:47 AM Robert Bradshaw 
wrote:

> If one of your inputs fits into memory, using side inputs is
> definitely the way to go. If neither side fits into memory, the cross
> product may be prohibitively large to compute even on a distributed
> computing platform (a billion times a billion is big, though I suppose
> one may hit memory limits with fewer elements if the elements
> themselves are large)


I agree, in practice the side input solution will usually suffice.

For CartesianProduct in particular, it is pretty common for one or more of
the inputs to have a statically known size, because it was created from an
in-memory sequence (i.e., with beam.Create). Otherwise we could look at
user-supplied hints, falling back to CoGroupByKey only if required.

There is also the (not uncommon) special case where _every_ input has
statically known size, e.g., CreateCartesianProduct().

one can still do the partitioning hack. E.g.
>
> partitions = pcoll_B | beam.Partition(hash, N)
> cross_product = tuple([
>   pcoll_A | beam.FlatMap(lambda a, bs: [(a, b) for b in bs],
> beam.pvalue.AsList(part))
>   for part in partitions
> ]) | beam.Flatten()


Interesting! I imagine this would break at some scale. Do you have an
intuition for what is a "reasonable" number of partitions -- 10s, 100s,
1000s?


Cartesian product of PCollections

2022-09-19 Thread Stephan Hoyer via dev
I'm wondering if it would make sense to have a built-in Beam transformation
for calculating the Cartesian product of PCollections.

Just this past week, I've encountered two separate cases where calculating
a Cartesian product was a bottleneck. The in-memory option of using
something like Python's itertools.product() is convenient, but it only
scales to a single node.

Unfortunately, implementing a scalable Cartesian product seems to be
somewhat non-trivial. I found two version of this question on
StackOverflow, but neither contains a code solution:
https://stackoverflow.com/questions/35008721/how-to-get-the-cartesian-product-of-two-pcollections
https://stackoverflow.com/questions/41050477/how-to-do-a-cartesian-product-of-two-pcollections-in-dataflow/

There's a fair amount of nuance in an efficient and scalable
implementation. My team has an internal implementation of a
CartesianProduct transform, based on using hashing to split a pcollection
into a finite number of groups and CoGroupByKey. On the other hand, if any
of the input pcollections are small, using side inputs would probably be
the way to go to avoid the need for a shuffle.

Any thoughts?

Cheers,
Stephan


Re: Consider Cloudpickle instead of dill for Python pickling

2021-06-08 Thread Stephan Hoyer
To give a quick update here: I did look into what it could look like to
swap out dill -> cloudpickle internally in Beam.

In principle, making the switch would be easy: literally just swap out
"dill" in favor of "cloudpickle". Both use the same loads/dumps API.

The hard part is figuring out how to make this a configuration option.
pickler.loads() and pickler.dumps() are used all over the Beam Python
codebase, including very far away from the Pipeline object on which
configurable options live, e.g., on pvalues, transforms, coders, runners
etc. It's not obvious to me what the right way to pass on this
configuration state is, and if it needs to be done explicitly, it seems
like it's going to be a lot of work.

On Mon, May 3, 2021 at 4:11 PM Ahmet Altay  wrote:

> I agree with Robert on this one. With the exception of DillCoder, it might
> be reasonable to conditionally support both. (On a related note, I only see
> one use of DillCoder, do we really need that coder?)
>
> On Mon, May 3, 2021 at 5:01 AM Kenneth Knowles  wrote:
>
>> My 2 contradictory cents from even further back in the peanut gallery:
>>
>>  - Pickle/dill/cloudpickle/etc are most suitable for transmission, not
>> storage, so changing is a lesser breaking change. But there still might be
>> streaming pipelines that are using it can cannot be updated.
>>
>
> I believe the problem arises from transmission between a client and a
> worker when they are not using compatible libraries. Much history here (
> https://github.com/uqfoundation/dill/issues/341) but I do not think dill
> is necessarily at fault here. I think we will need to ensure using
> compatible libraries in both environments.
>
>
>>  - A serialization library with an unstable/breaking serialization format
>> is not production-ready. If open version ranges are unsafe, that is an
>> indication that it is not ready for use.
>>
>
> It was a bug and it happened twice according to the maintainer. (See the
> issue above). I am not sure it will be much better with a different library.
>
>
>>  - We should use whatever the rest of the world uses. That is more
>> important than either of the above two points.
>>
>
> Both are similarly popular. Looks like dill is a bit (50%) more popular (
> https://libraries.io/pypi/dill vs https://libraries.io/pypi/cloudpickle)
> and has a more recent release.
>
>
>>
>> Kenn
>>
>> On Sat, May 1, 2021 at 5:15 AM Jarek Potiuk  wrote:
>>
>>> Just my 2 cents  comment from the users perspective.
>>>
>>> In Airflow, the narrow limits of `dill` caused some problems with
>>> dependencies. We had to add some exceptions in our process for that:
>>> https://github.com/apache/airflow/blob/master/Dockerfile#L246
>>> https://github.com/apache/airflow/blob/master/Dockerfile.ci#L271  - so
>>> the problem is largely solved for now, but if dill would be used by any
>>> different library it could be a problem. I imagine cloudpickle is more
>>> frequently used than dill, so it might become a problem if those
>>> dependencies are narrowly defined.
>>>
>>> Currently cloudpickle for Airflow is already pulled in by
>>> Dask's  "distributed"  library (but they have just > limits there):
>>>
>>> distributed==2.19.0
>>>   - click [required: >=6.6, installed: 7.1.2]
>>>   - cloudpickle [required: >=1.3.0, installed: 1.4.1]
>>>   - dask [required: >=2.9.0, installed: 2021.4.1]
>>> - cloudpickle [required: >=1.1.1, installed: 1.4.1]
>>> - fsspec [required: >=0.6.0, installed: 2021.4.0]
>>>
>>> However, I have a better idea - why don't you simply vendor-in either
>>> `dill` or `cloudpickle` (I am not sure which one is best) ?
>>>
>>> Since you are not planning to upgrade it often (that's the whole point
>>> of narrow versioning), you can have the best of both worlds - stable
>>> version used in both client/server AND you would not be limiting others.
>>>
>>> J.
>>>
>>>
>>> On Fri, Apr 30, 2021 at 9:42 PM Stephan Hoyer  wrote:
>>>
>>>> Glad to hear this is something you've open to and in fact have already
>>>> considered :)
>>>>
>>>> I may give implementing this a try, though I'm not familiar with how
>>>> configuration options are managed in Beam, so that may be easier for a core
>>>> developer to deal with.
>>>>
>>>> On Fri, Apr 30, 2021 at 10:58 AM Robert Bradshaw 
>>>> wrote:
>>>>
>>>>> As I've mentioned before, I woul

Re: Out of band pickling in Python (pickle5)

2021-05-27 Thread Stephan Hoyer
I'm unlikely to have bandwidth to take this one on, but I do think it would
be quite valuable!

On Thu, May 27, 2021 at 4:42 PM Brian Hulette  wrote:

> I filed https://issues.apache.org/jira/browse/BEAM-12418 for this. Would
> you have any interest in taking it on?
>
> On Tue, May 25, 2021 at 3:09 PM Brian Hulette  wrote:
>
>> Hm this would definitely be of interest for the DataFrame API, which is
>> shuffling pandas objects. This issue [1] confirms what you suggested above,
>> that pandas supports out-of-band pickling since DataFrames are mostly just
>> collections of numpy arrays.
>>
>> Brian
>>
>> [1] https://github.com/pandas-dev/pandas/issues/34244
>>
>> On Tue, May 25, 2021 at 2:59 PM Stephan Hoyer  wrote:
>>
>>> Beam's PickleCoder would need to be updated to pass the
>>> "buffer_callback" argument into pickle.dumps() and the "buffers" argument
>>> into pickle.loads(). I expect this would be relatively straightforward.
>>>
>>> Then it should "just work", assuming that data is stored in objects
>>> (like NumPy arrays or wrappers of NumPy arrays) that implement the
>>> out-of-band Pickle protocol.
>>>
>>>
>>> On Tue, May 25, 2021 at 2:50 PM Brian Hulette 
>>> wrote:
>>>
>>>> I'm not aware of anyone looking at it.
>>>>
>>>> Will out-of-band pickling "just work" in Beam for types that implement
>>>> the correct interface in Python 3.8?
>>>>
>>>> On Tue, May 25, 2021 at 2:43 PM Evan Galpin 
>>>> wrote:
>>>>
>>>>> +1
>>>>>
>>>>> FWIW I recently ran into the exact case you described (high
>>>>> serialization cost). The solution was to implement some not-so-intuitive
>>>>> alternative transforms in my case, but I would have very much appreciated
>>>>> faster serialization performance.
>>>>>
>>>>> Thanks,
>>>>> Evan
>>>>>
>>>>> On Tue, May 25, 2021 at 15:26 Stephan Hoyer  wrote:
>>>>>
>>>>>> Has anyone looked into out of band pickling for Beam's Python SDK,
>>>>>> i.e., Pickle protocol version 5?
>>>>>> https://www.python.org/dev/peps/pep-0574/
>>>>>> https://docs.python.org/3/library/pickle.html#out-of-band-buffers
>>>>>>
>>>>>> For Beam pipelines passing around NumPy arrays (or collections of
>>>>>> NumPy arrays, like pandas or Xarray) I've noticed that serialization 
>>>>>> costs
>>>>>> can be significant. Beam seems to currently incur at least one one (maybe
>>>>>> two) unnecessary memory copies.
>>>>>>
>>>>>> Pickle protocol version 5 exists for solving exactly this problem.
>>>>>> You can serialize collections of arbitrary Python objects in a fully
>>>>>> streaming fashion using memory buffers. This is a Python 3.8 feature, but
>>>>>> the "pickle5" library provides a backport to Python 3.6 and 3.7. It has
>>>>>> been supported by NumPy since version 1.16, released in January 2019.
>>>>>>
>>>>>> Cheers,
>>>>>> Stephan
>>>>>>
>>>>>


Re: Out of band pickling in Python (pickle5)

2021-05-25 Thread Stephan Hoyer
Beam's PickleCoder would need to be updated to pass the "buffer_callback"
argument into pickle.dumps() and the "buffers" argument into
pickle.loads(). I expect this would be relatively straightforward.

Then it should "just work", assuming that data is stored in objects (like
NumPy arrays or wrappers of NumPy arrays) that implement the out-of-band
Pickle protocol.


On Tue, May 25, 2021 at 2:50 PM Brian Hulette  wrote:

> I'm not aware of anyone looking at it.
>
> Will out-of-band pickling "just work" in Beam for types that implement the
> correct interface in Python 3.8?
>
> On Tue, May 25, 2021 at 2:43 PM Evan Galpin  wrote:
>
>> +1
>>
>> FWIW I recently ran into the exact case you described (high serialization
>> cost). The solution was to implement some not-so-intuitive alternative
>> transforms in my case, but I would have very much appreciated faster
>> serialization performance.
>>
>> Thanks,
>> Evan
>>
>> On Tue, May 25, 2021 at 15:26 Stephan Hoyer  wrote:
>>
>>> Has anyone looked into out of band pickling for Beam's Python SDK, i.e.,
>>> Pickle protocol version 5?
>>> https://www.python.org/dev/peps/pep-0574/
>>> https://docs.python.org/3/library/pickle.html#out-of-band-buffers
>>>
>>> For Beam pipelines passing around NumPy arrays (or collections of NumPy
>>> arrays, like pandas or Xarray) I've noticed that serialization costs can be
>>> significant. Beam seems to currently incur at least one one (maybe two)
>>> unnecessary memory copies.
>>>
>>> Pickle protocol version 5 exists for solving exactly this problem. You
>>> can serialize collections of arbitrary Python objects in a fully streaming
>>> fashion using memory buffers. This is a Python 3.8 feature, but the
>>> "pickle5" library provides a backport to Python 3.6 and 3.7. It has been
>>> supported by NumPy since version 1.16, released in January 2019.
>>>
>>> Cheers,
>>> Stephan
>>>
>>


Out of band pickling in Python (pickle5)

2021-05-25 Thread Stephan Hoyer
Has anyone looked into out of band pickling for Beam's Python SDK, i.e.,
Pickle protocol version 5?
https://www.python.org/dev/peps/pep-0574/
https://docs.python.org/3/library/pickle.html#out-of-band-buffers

For Beam pipelines passing around NumPy arrays (or collections of NumPy
arrays, like pandas or Xarray) I've noticed that serialization costs can be
significant. Beam seems to currently incur at least one one (maybe two)
unnecessary memory copies.

Pickle protocol version 5 exists for solving exactly this problem. You can
serialize collections of arbitrary Python objects in a fully streaming
fashion using memory buffers. This is a Python 3.8 feature, but the
"pickle5" library provides a backport to Python 3.6 and 3.7. It has been
supported by NumPy since version 1.16, released in January 2019.

Cheers,
Stephan


Re: Apply a Beam PTransform per key

2021-05-24 Thread Stephan Hoyer
Happy to give a concrete example, I even have open source code I can share
in this case :)
https://github.com/google/xarray-beam/tree/9728970aa18abddafec22a23cad92b5d4a1e11e5/examples
https://github.com/google/xarray-beam/blob/9728970aa18abddafec22a23cad92b5d4a1e11e5/examples/era5_rechunk.py

This particular example reads and writes a 25 TB weather dataset stored in
Google Cloud Storage. The dataset consists of 19 variables, each of which
is logically a 3D array of shape (350640, 721, 1440), stored in blocks of
shape (31, 721, 1440) via Zarr <https://zarr.readthedocs.io/en/stable/>.
Now I want to "rechunk" them into blocks of shape (350640, 5, 5), which is
more convenient for queries like "Return the past 40 years of weather for
this particular location". To be clear, this particular use-case is
synthetic, but it reflects a common pattern for large-scale processing of
weather and climate datasets.

I originally wrote my pipeline to process all 19 variables at once, but it
looks like it would be more efficient to process them separately. So now I
want to essentially re-run my original pipeline 19 times in parallel.

For this particular codebase, I think the right call probably *is* to
rewrite all my underlying transforms to handle an expanded key, including
the variable name. This will pay other dividends. But if I didn't want to
do that refactor, I would need to duplicate or Partition the PCollection
into 19 parts, which seems like a lot. My xarray_beam.Rechunk() transform
includes a few GroupByKey transforms inside and definitely cannot operate
in-memory.



On Mon, May 24, 2021 at 4:12 PM Reuven Lax  wrote:

> Can you explain a bit more? Where are these data sets coming from?
>
> On Mon, May 24, 2021 at 3:55 PM Stephan Hoyer  wrote:
>
>> I'm not concerned with key-dependent topologies, which I didn't even
>> think was possible to express in Beam.
>>
>> It's more that I already wrote a PTransform for processing a *single* 1
>> TB dataset. Now I want to write a single PTransform that effectively runs
>> the original PTransform in groups over ~20 such datasets (ideally without
>> needing to know that number 20 ahead of time).
>>
>> On Mon, May 24, 2021 at 3:30 PM Reuven Lax  wrote:
>>
>>> Is the issue that you have a different topology depending on the key?
>>>
>>> On Mon, May 24, 2021 at 2:49 PM Stephan Hoyer  wrote:
>>>
>>>> Exactly, my use-case has another nested GroupByKey to apply per key.
>>>> But even if it could be done in a streaming fashion, it's way too much data
>>>> (1 TB) to process on a single worker in a reasonable amount of time.
>>>>
>>>> On Mon, May 24, 2021 at 2:46 PM Kenneth Knowles 
>>>> wrote:
>>>>
>>>>> I was thinking there was some non-trivial topology (such as further
>>>>> GBKs) within the logic to be applied to each key group.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Mon, May 24, 2021 at 2:38 PM Brian Hulette 
>>>>> wrote:
>>>>>
>>>>>> Isn't it possible to read the grouped values produced by a GBK from
>>>>>> an Iterable and yield results as you go, without needing to collect all 
>>>>>> of
>>>>>> each input into memory? Perhaps I'm misunderstanding your use-case.
>>>>>>
>>>>>> Brian
>>>>>>
>>>>>> On Mon, May 24, 2021 at 10:41 AM Kenneth Knowles 
>>>>>> wrote:
>>>>>>
>>>>>>> I'm just pinging this thread because I think it is an interesting
>>>>>>> problem and don't want it to slip by.
>>>>>>>
>>>>>>> I bet a lot of users have gone through the tedious conversion you
>>>>>>> describe. Of course, it may often not be possible if you are using a
>>>>>>> library transform. There are a number of aspects of the Beam model that 
>>>>>>> are
>>>>>>> designed a specific way explicitly *because* we need to assume that a 
>>>>>>> large
>>>>>>> number of composites in your pipeline are not modifiable by you. Most
>>>>>>> closely related: this is why windowing is something carried along
>>>>>>> implicitly rather than just a parameter to GBK - that would require all
>>>>>>> transforms to expose how they use GBK under the hood and they would all
>>>>>>> have to plumb this extra key/WindowFn through every API. Instead, we 
>>>>>>> have
>>>>>>> this way to implicit

Re: Apply a Beam PTransform per key

2021-05-24 Thread Stephan Hoyer
I'm not concerned with key-dependent topologies, which I didn't even think
was possible to express in Beam.

It's more that I already wrote a PTransform for processing a *single* 1 TB
dataset. Now I want to write a single PTransform that effectively runs the
original PTransform in groups over ~20 such datasets (ideally without
needing to know that number 20 ahead of time).

On Mon, May 24, 2021 at 3:30 PM Reuven Lax  wrote:

> Is the issue that you have a different topology depending on the key?
>
> On Mon, May 24, 2021 at 2:49 PM Stephan Hoyer  wrote:
>
>> Exactly, my use-case has another nested GroupByKey to apply per key. But
>> even if it could be done in a streaming fashion, it's way too much data (1
>> TB) to process on a single worker in a reasonable amount of time.
>>
>> On Mon, May 24, 2021 at 2:46 PM Kenneth Knowles  wrote:
>>
>>> I was thinking there was some non-trivial topology (such as further
>>> GBKs) within the logic to be applied to each key group.
>>>
>>> Kenn
>>>
>>> On Mon, May 24, 2021 at 2:38 PM Brian Hulette 
>>> wrote:
>>>
>>>> Isn't it possible to read the grouped values produced by a GBK from an
>>>> Iterable and yield results as you go, without needing to collect all of
>>>> each input into memory? Perhaps I'm misunderstanding your use-case.
>>>>
>>>> Brian
>>>>
>>>> On Mon, May 24, 2021 at 10:41 AM Kenneth Knowles 
>>>> wrote:
>>>>
>>>>> I'm just pinging this thread because I think it is an interesting
>>>>> problem and don't want it to slip by.
>>>>>
>>>>> I bet a lot of users have gone through the tedious conversion you
>>>>> describe. Of course, it may often not be possible if you are using a
>>>>> library transform. There are a number of aspects of the Beam model that 
>>>>> are
>>>>> designed a specific way explicitly *because* we need to assume that a 
>>>>> large
>>>>> number of composites in your pipeline are not modifiable by you. Most
>>>>> closely related: this is why windowing is something carried along
>>>>> implicitly rather than just a parameter to GBK - that would require all
>>>>> transforms to expose how they use GBK under the hood and they would all
>>>>> have to plumb this extra key/WindowFn through every API. Instead, we have
>>>>> this way to implicitly add a second key to any transform :-)
>>>>>
>>>>> So in addition to being tedious for you, it would be good to have a
>>>>> better solution.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Fri, May 21, 2021 at 7:18 PM Stephan Hoyer 
>>>>> wrote:
>>>>>
>>>>>> I'd like to write a Beam PTransform that applies an *existing* Beam
>>>>>> transform to each set of grouped values, separately, and combines the
>>>>>> result. Is anything like this possible with Beam using the Python SDK?
>>>>>>
>>>>>> Here are the closest things I've come up with:
>>>>>> 1. If each set of *inputs* to my transform fit into memory, I could
>>>>>> use GroupByKey followed by FlatMap.
>>>>>> 2. If each set of *outputs* from my transform fit into memory, I
>>>>>> could use CombinePerKey.
>>>>>> 3. If I knew the static number of groups ahead of time, I could use
>>>>>> Partition, followed by applying my transform multiple times, followed by
>>>>>> Flatten.
>>>>>>
>>>>>> In my scenario, none of these holds true. For example, currently I
>>>>>> have ~20 groups of values, with each group holding ~1 TB of data. My 
>>>>>> custom
>>>>>> transform simply shuffles this TB of data around, so each set of outputs 
>>>>>> is
>>>>>> also 1TB in size.
>>>>>>
>>>>>> In my particular case, it seems my options are to either relax these
>>>>>> constraints, or to manually convert each step of my existing transform to
>>>>>> apply per key. This conversion process is tedious, but very
>>>>>> straightforward, e.g., the GroupByKey and ParDo that my transform is 
>>>>>> built
>>>>>> out of just need to deal with an expanded key.
>>>>>>
>>>>>> I wonder, could this be something built into Beam itself, e.g,. as
>>>>>> TransformPerKey? The ptranforms that result from combining other Beam
>>>>>> transforms (e.g., _ChainPTransform in Python) are private, so this seems
>>>>>> like something that would need to exist in Beam itself, if it could exist
>>>>>> at all.
>>>>>>
>>>>>> Cheers,
>>>>>> Stephan
>>>>>>
>>>>>


Re: Apply a Beam PTransform per key

2021-05-24 Thread Stephan Hoyer
Exactly, my use-case has another nested GroupByKey to apply per key. But
even if it could be done in a streaming fashion, it's way too much data (1
TB) to process on a single worker in a reasonable amount of time.

On Mon, May 24, 2021 at 2:46 PM Kenneth Knowles  wrote:

> I was thinking there was some non-trivial topology (such as further GBKs)
> within the logic to be applied to each key group.
>
> Kenn
>
> On Mon, May 24, 2021 at 2:38 PM Brian Hulette  wrote:
>
>> Isn't it possible to read the grouped values produced by a GBK from an
>> Iterable and yield results as you go, without needing to collect all of
>> each input into memory? Perhaps I'm misunderstanding your use-case.
>>
>> Brian
>>
>> On Mon, May 24, 2021 at 10:41 AM Kenneth Knowles  wrote:
>>
>>> I'm just pinging this thread because I think it is an interesting
>>> problem and don't want it to slip by.
>>>
>>> I bet a lot of users have gone through the tedious conversion you
>>> describe. Of course, it may often not be possible if you are using a
>>> library transform. There are a number of aspects of the Beam model that are
>>> designed a specific way explicitly *because* we need to assume that a large
>>> number of composites in your pipeline are not modifiable by you. Most
>>> closely related: this is why windowing is something carried along
>>> implicitly rather than just a parameter to GBK - that would require all
>>> transforms to expose how they use GBK under the hood and they would all
>>> have to plumb this extra key/WindowFn through every API. Instead, we have
>>> this way to implicitly add a second key to any transform :-)
>>>
>>> So in addition to being tedious for you, it would be good to have a
>>> better solution.
>>>
>>> Kenn
>>>
>>> On Fri, May 21, 2021 at 7:18 PM Stephan Hoyer  wrote:
>>>
>>>> I'd like to write a Beam PTransform that applies an *existing* Beam
>>>> transform to each set of grouped values, separately, and combines the
>>>> result. Is anything like this possible with Beam using the Python SDK?
>>>>
>>>> Here are the closest things I've come up with:
>>>> 1. If each set of *inputs* to my transform fit into memory, I could
>>>> use GroupByKey followed by FlatMap.
>>>> 2. If each set of *outputs* from my transform fit into memory, I could
>>>> use CombinePerKey.
>>>> 3. If I knew the static number of groups ahead of time, I could use
>>>> Partition, followed by applying my transform multiple times, followed by
>>>> Flatten.
>>>>
>>>> In my scenario, none of these holds true. For example, currently I have
>>>> ~20 groups of values, with each group holding ~1 TB of data. My custom
>>>> transform simply shuffles this TB of data around, so each set of outputs is
>>>> also 1TB in size.
>>>>
>>>> In my particular case, it seems my options are to either relax these
>>>> constraints, or to manually convert each step of my existing transform to
>>>> apply per key. This conversion process is tedious, but very
>>>> straightforward, e.g., the GroupByKey and ParDo that my transform is built
>>>> out of just need to deal with an expanded key.
>>>>
>>>> I wonder, could this be something built into Beam itself, e.g,. as
>>>> TransformPerKey? The ptranforms that result from combining other Beam
>>>> transforms (e.g., _ChainPTransform in Python) are private, so this seems
>>>> like something that would need to exist in Beam itself, if it could exist
>>>> at all.
>>>>
>>>> Cheers,
>>>> Stephan
>>>>
>>>


Apply a Beam PTransform per key

2021-05-21 Thread Stephan Hoyer
I'd like to write a Beam PTransform that applies an *existing* Beam
transform to each set of grouped values, separately, and combines the
result. Is anything like this possible with Beam using the Python SDK?

Here are the closest things I've come up with:
1. If each set of *inputs* to my transform fit into memory, I could use
GroupByKey followed by FlatMap.
2. If each set of *outputs* from my transform fit into memory, I could use
CombinePerKey.
3. If I knew the static number of groups ahead of time, I could use
Partition, followed by applying my transform multiple times, followed by
Flatten.

In my scenario, none of these holds true. For example, currently I have ~20
groups of values, with each group holding ~1 TB of data. My custom
transform simply shuffles this TB of data around, so each set of outputs is
also 1TB in size.

In my particular case, it seems my options are to either relax these
constraints, or to manually convert each step of my existing transform to
apply per key. This conversion process is tedious, but very
straightforward, e.g., the GroupByKey and ParDo that my transform is built
out of just need to deal with an expanded key.

I wonder, could this be something built into Beam itself, e.g,. as
TransformPerKey? The ptranforms that result from combining other Beam
transforms (e.g., _ChainPTransform in Python) are private, so this seems
like something that would need to exist in Beam itself, if it could exist
at all.

Cheers,
Stephan


Re: Proposal: Generalize S3FileSystem

2021-05-20 Thread Stephan Hoyer
On Thu, May 20, 2021 at 10:12 AM Chad Dombrova  wrote:

> Hi Brian,
> I think the main goal would be to make a python package that could be pip
> installed independently of apache_beam.  That goal could be accomplished
> with option 3, thus preserving all of the benefits of a monorepo. If it
> gains enough popularity and contributors outside of the Beam community,
> then options 1 and 2 could be considered to make it easier to foster a new
> community of contributors.
>

This sounds like a lovely goal!

I'll just mention the "fsspec" Python project, which came out of Dask:
https://filesystem-spec.readthedocs.io/en/latest/

As far as I can tell, it serves basically this exact same purpose (generic
filesystems with high-performance IO), and has started to get some traction
in other projects, e.g., it's now used in pandas. I don't know if it would
be suitable for Beam, but it might be worth a try.

Cheers,
Stephan


> Beam has a lot of great tech in it, and it makes me think of Celery, which
> is a much older python project of a similar ilk that spawned a series of
> useful independent projects: kombu [1], an AMQP messaging library, and
> billiard [2], a multiprocessing library.
>
> Obviously, there are a number of pros and cons to consider.  The cons are
> pretty clear: even within a monorepo it will make the Beam build more
> complicated.  The pros are a bit more abstract.  The fileIO project could
> appeal to a broader audience, and act as a signpost for Beam (on PyPI,
> etc), thereby increasing awareness of Beam amongst the types of
> cloud-friendly python developers who would need the fileIO package.
>
> -chad
>
> [1] https://github.com/celery/kombu
> [2] https://github.com/celery/billiard
>
>
>
>
> On Thu, May 20, 2021 at 7:57 AM Brian Hulette  wrote:
>
>> That's an interesting idea. What do you mean by its own project? A couple
>> of possibilities:
>> - Spinning off a new ASF project
>> - A separate Beam-governed repository (e.g. apache/beam-filesystems)
>> - More clearly separate it in the current build system and release
>> artifacts that allow it to be used independently
>>
>> Personally I'd be resistant to the first two (I am a Google engineer and
>> I like monorepos after all), but I don't see a major problem with the last
>> one, except that it gives us another surface to maintain.
>>
>> Brian
>>
>> On Wed, May 19, 2021 at 8:38 PM Chad Dombrova  wrote:
>>
>>> This is a random idea, but the whole file IO system inside Beam would
>>> actually be awesome to extract into its own project.  IIRC, it’s not
>>> particularly tied to Beam.
>>>
>>> I’m not saying this should be done now, but it’s be nice to keep it mind
>>> for a future goal.
>>>
>>> -chad
>>>
>>>
>>>
>>> On Wed, May 19, 2021 at 10:23 AM Pablo Estrada 
>>> wrote:
>>>
 That would be great to add, Matt. Of course it's important to make this
 backwards compatible, but other than that, the addition would be very
 welcome.

 On Wed, May 19, 2021 at 9:41 AM Matt Rudary 
 wrote:

> Hi,
>
>
>
> This is a quick sketch of a proposal – I wanted to get a sense of
> whether there’s general support for this idea before fleshing it out
> further, getting internal approvals, etc.
>
>
>
> I’m working with multiple storage systems that speak the S3 api. I
> would like to support FileIO operations for these storage systems, but
> S3FileSystem hardcodes the s3 scheme (the various systems use different 
> URI
> schemes) and it is in any case impossible to instantiate more than one in
> the current design.
>
>
>
> I’d like to refactor the code in org.apache.beam.sdk.io.aws.s3 (and
> maybe …aws.options) somewhat to enable this use-case. I haven’t worked out
> the details yet, but it will take some thought to make this work in a
> non-hacky way.
>
>
>
> Thanks
>
> Matt Rudary
>



Re: Transform-specific thread pools in Python

2021-05-11 Thread Stephan Hoyer
On Mon, May 10, 2021 at 4:28 PM Ahmet Altay  wrote:

>
>
> On Mon, May 10, 2021 at 8:01 AM Stephan Hoyer  wrote:
>
>> Hi Beam devs,
>>
>> I've been exploring recently how to optimize IO bound steps for my Python
>> Beam pipelines, and have come up with a solution that I think might make
>> sense to upstream into Beam's Python SDK.
>>
>> It appears that Beam runners (at least the Cloud Dataflow runner)
>> typically use only a single thread per Python process.
>>
>
> I thought the default was not 1 but something else (12?). Maybe that
> changed.
>

>
>> The number of threads per worker can be adjusted with flags, but only for
>> the entire pipeline. This behavior makes sense *in general* under the
>> worst-case assumption that user-code in Python is CPU bound and requires
>> the GIL.
>>
>> However, multiple threads can be quite helpful in many cases, e.g.,
>> 1. CPU bound tasks that release the GIL. This is typically the case when
>> using libraries for numerical computing, such as NumPy and pandas.
>> 2. IO bound tasks that can be run asynchronously, e.g., reading/writing
>> files or RPCs. This is the use-case for which not using threads can be most
>> problematic, e.g., in a recent dataflow pipeline reading/writing lots of
>> relatively small files (~1-10 MB) to cloud storage with the default number
>> of threads per worker, I found that I was only using ~20% of available CPU.
>>
>> Because the optimal number of threads for Python code can be quite
>> heterogeneous, I would like to be able to indicate that particular steps of
>> my Beam pipelines should be executed using more threads. This would be
>> particularly valuable for writing libraries of custom IO transforms, which
>> should still conservatively assume that *other* steps in user provided
>> pipelines may be CPU bound.
>>
>> The solution I've come up with is to use beam.BatchElements with a ParDo
>> function that executes tasks in separate threads (via
>> concurrent.futures.ThreadPool). I've used this to make high-level wrappers
>> like beam.Map, beam.MapTuple, etc that execute with multiple threads. This
>> seems to work pretty well for my use-cases. I can put these in my own
>> library, of course, but perhaps these would make sense upstream into Beam's
>> Python SDK itself?
>>
>
> I believe a related idea (async pardo) was discussed and some work was
> done earlier (https://issues.apache.org/jira/browse/BEAM-6550). AFAIK
> Flink also has a similar concept (
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/asyncio/)
> as well.
>
> Perhaps you can share a bit more details about your proposal along with
> your code and people could provide more feedback on that.
>
>
Yes, async ParDo does look like the same thing! This sounds like exactly
the same use-case -- a ParDo for IO that can be asynchronously executed.

In fact, Python has async IO, too. In my particular case threads are
slightly more convenient (the libraries I'm using do not natively support
async), but that is really a minor detail. If we had a AsyncParDo for
Python, I could make that work very easily. In the worst case, I could use
a separate thread inside each async call.

In any case, see here for my implementation of a ThreadMap ptransform:
https://github.com/google/xarray-beam/blob/0.0.1/xarray_beam/_src/threadmap.py

Let me know if you think this might be of interest upstream in Beam. I
agree that in the long term this makes sense to be implemented in runners,
though I guess that might be more challenging to implement.


>> One alternative would be supporting this sort of concurrency control
>> inside Beam runners. In principle, I imagine runners could tune thread-pool
>> size for each stage automatically, e.g., based on CPU usage. To be honest,
>> I'm a little surprised this doesn't happen already, but I'm sure there are
>> good reasons why not.
>>
>
> Runner support would be the ideal solution. Because runners could decide
> on the most optimal pool size based on the real time information.
> Supporting and using annotations would provide helpful hints for the
> runners. At least the latter part is in progres IIRC.
>
>
>>
>> Let me know what you think!
>>
>> Cheers,
>> Stephan
>>
>>


Transform-specific thread pools in Python

2021-05-10 Thread Stephan Hoyer
Hi Beam devs,

I've been exploring recently how to optimize IO bound steps for my Python
Beam pipelines, and have come up with a solution that I think might make
sense to upstream into Beam's Python SDK.

It appears that Beam runners (at least the Cloud Dataflow runner) typically
use only a single thread per Python process. The number of threads per
worker can be adjusted with flags, but only for the entire pipeline. This
behavior makes sense *in general* under the worst-case assumption that
user-code in Python is CPU bound and requires the GIL.

However, multiple threads can be quite helpful in many cases, e.g.,
1. CPU bound tasks that release the GIL. This is typically the case when
using libraries for numerical computing, such as NumPy and pandas.
2. IO bound tasks that can be run asynchronously, e.g., reading/writing
files or RPCs. This is the use-case for which not using threads can be most
problematic, e.g., in a recent dataflow pipeline reading/writing lots of
relatively small files (~1-10 MB) to cloud storage with the default number
of threads per worker, I found that I was only using ~20% of available CPU.

Because the optimal number of threads for Python code can be quite
heterogeneous, I would like to be able to indicate that particular steps of
my Beam pipelines should be executed using more threads. This would be
particularly valuable for writing libraries of custom IO transforms, which
should still conservatively assume that *other* steps in user provided
pipelines may be CPU bound.

The solution I've come up with is to use beam.BatchElements with a ParDo
function that executes tasks in separate threads (via
concurrent.futures.ThreadPool). I've used this to make high-level wrappers
like beam.Map, beam.MapTuple, etc that execute with multiple threads. This
seems to work pretty well for my use-cases. I can put these in my own
library, of course, but perhaps these would make sense upstream into Beam's
Python SDK itself?

One alternative would be supporting this sort of concurrency control inside
Beam runners. In principle, I imagine runners could tune thread-pool size
for each stage automatically, e.g., based on CPU usage. To be honest, I'm a
little surprised this doesn't happen already, but I'm sure there are good
reasons why not.

Let me know what you think!

Cheers,
Stephan


Python API reference docs could use better organization

2021-04-30 Thread Stephan Hoyer
(Note: I also filed this as a JIRA [1] a few days ago, but I noticed that
the mailing list seems to be a better place for opening discussions.)

I've been enjoying diving into Beam recently, but to my frustration I've
found that I often need to look through the source code to discover APIs.

Beam has some really nice documentation on its website (I particularly love
the "transform catalog") but I find the "API docs" [1] to be nearly
unusable, at least for the Python SDK. For example, try clicking on any of
the sub-headings, e.g., apache_beam.io [3]. It's a long, heavily nested
listing of the raw internal structure of Beam's python modules.

To enumerate my concerns:
1. It's hard to navigate. I need to know exactly where a function is
defined to find it. E.g., to find beam.Map, I had to click on
"apache_beam.transforms package" followed by "apache_beam.transforms.core
module" and then scroll down or search in the page for "Map."
2. It isn't clear exactly which components are public APIs. The
documentation for a few modules notes that they are not public, but there
are so many others listed that I'm sure they cannot all be intended for
public support. This makes it hard to find Beam's main public APIs.
3. It isn't clear the preferred import paths to use. For example,
apache_beam.Map is documented as apache_beam.transforms.core.Map, without
mention of the shorter name.

I suspect the source of most of these issues is that the API docs make
heavy use of Sphinx's autodoc for modules. In my experience maintaining
Python projects, this just doesn't work very well. autosummary and
autofunction on individual functions/classes work well, but it needs to be
organized by hand – you can't count on automodule to do a good job of high
level organization. JAX's docs are a good example, e.g., see the source
code [4] and rendered HTML [5].

This would definitely be a bit of work, but is relatively straightforward
to set-up and I think would pay big dividends for discoverability of Beam's
API. I've gone through this process a few times for different projects, so
I would be happy to advise if/as issues come up.

Cheers,
Stephan

[1] https://issues.apache.org/jira/browse/BEAM-12235
[2] https://beam.apache.org/releases/pydoc/2.28.0/index.html
[3] https://beam.apache.org/releases/pydoc/2.28.0/apache_beam.io.html
[4] https://github.com/google/jax/blob/master/docs/jax.rst
[5] https://jax.readthedocs.io/en/latest/jax.html


Re: Consider Cloudpickle instead of dill for Python pickling

2021-04-30 Thread Stephan Hoyer
 in dill, features although some are
> still not working for us [3].
> >>> I agree that Beam can and should support cloudpickle as a pickler.
> Practically, we can make cloudpickle the default pickler starting from a
> particular python version, for example we are planning to add Python 3.9
> support and we can try to make cloudpickle the default pickler for this
> version to avoid breaking users while ironing out rough edges.
> >>>
> >>> My main concern is client-server version range compatibility of the
> pickler. When SDK creates the job representation, it serializes the objects
> using the pickler used on the user's machine. When SDK deserializes the
> objects on the Runner side, it uses the pickler installed on the runner,
> for example it can be a dill version installed the docker container
> provided by Beam or Dataflow. We have been burned in the past by having an
> open version bound for the pickler in Beam's requirements: client side
> would pick the newest version, but runner container would have a somewhat
> older version, either because the container did not have the new version,
> or because some pipeline dependency wanted to downgrade dill. Older version
> of pickler did not correctly deserialize new pickles. I suspect cloudpickle
> may have the same problem. A solution was to have a very tight version
> range for the pickler in SDK's requirements [4]. Given that dill is not a
> popular dependency, the tight range did not create much friction for Beam
> users. I think with cloudpickle we will not be able have a tight range.  We
> could solve this problem by passing the version of pickler used at job
> submission, and have a check on the runner to make sure that the client
> version is not newer than the runner's version. Additionally, we should
> make sure cloudpickle is backwards compatible (newer version can
> deserialize objects created by older version).
> >>>
> >>> [1]
> https://lists.apache.org/thread.html/d431664a3fc1039faa01c10e2075659288aec5961c7b4b59d9f7b889%40%3Cdev.beam.apache.org%3E
> >>> [2] https://issues.apache.org/jira/browse/BEAM-8123
> >>> [3]
> https://github.com/uqfoundation/dill/issues/300#issuecomment-525409202
> >>> [4]
> https://github.com/apache/beam/blob/master/sdks/python/setup.py#L138-L143
> >>>
> >>> On Thu, Apr 29, 2021 at 8:04 PM Stephan Hoyer 
> wrote:
> >>>>
> >>>> cloudpickle [1] and dill [2] are two Python packages that implement
> extensions of Python's pickle protocol for arbitrary objects. Beam
> currently uses dill, but I'm wondering if we could consider additionally or
> alternatively use cloudpickle instead.
> >>>>
> >>>> Overall, cloudpickle seems to be a more popular choice for extended
> pickle support in distributing computing in Python, e.g., it's used by
> Spark, Dask and joblib.
> >>>>
> >>>> One of the major differences between cloudpickle and dill is how they
> handle pickling global variables (such as Python modules) that are referred
> to by a function:
> >>>> - Dill doesn't serialize globals. If you want to save globals, you
> need to call dill.dump_session(). This is what the "save_main_session" flag
> does in Beam.
> >>>> - Cloudpickle takes a different approach. It introspects which global
> variables are used by a function, and creates a closure around the
> serialized function that only contains these variables.
> >>>>
> >>>> The cloudpickle approach results in larger serialized functions, but
> it's also much more robust, because the required globals are included by
> default. In contrast, with dill, one either needs to save all globals or
> none. This is repeated pain-point for Beam Python users [3]:
> >>>> - Saving all globals can be overly aggressive, particularly in
> notebooks where users may have incidentally created large objects.
> >>>> - Alternatively, users can avoid using global variables entirely, but
> this makes defining ad-hoc pipelines very awkward. Mapped over functions
> need to be imported from other modules, or need to have their imports
> defined inside the function itself.
> >>>>
> >>>> I'd love to see an option to use cloudpickle in Beam instead of dill,
> and to consider switching over entirely. Cloudpickle would allow Beam users
> to write readable code in the way they expect, without needing to worry
> about the confusing and potentially problematic "save_main_session" flag.
> >>>>
> >>>> Any thoughts?
> >>>>
> >>>> Cheers,
> >>>> Stephan
> >>>>
> >>>> [1] https://github.com/cloudpipe/cloudpickle
> >>>> [2] https://github.com/uqfoundation/dill
> >>>> [3]
> https://cloud.google.com/dataflow/docs/resources/faq#how_do_i_handle_nameerrors
> >>>>
>


Consider Cloudpickle instead of dill for Python pickling

2021-04-29 Thread Stephan Hoyer
cloudpickle [1] and dill [2] are two Python packages that implement
extensions of Python's pickle protocol for arbitrary objects. Beam
currently uses dill, but I'm wondering if we could consider additionally or
alternatively use cloudpickle instead.

Overall, cloudpickle seems to be a more popular choice for extended pickle
support in distributing computing in Python, e.g., it's used by Spark, Dask
and joblib.

One of the major differences between cloudpickle and dill is how they
handle pickling global variables (such as Python modules) that are referred
to by a function:
- Dill doesn't serialize globals. If you want to save globals, you need to
call dill.dump_session(). This is what the "save_main_session" flag does in
Beam.
- Cloudpickle takes a different approach. It introspects which global
variables are used by a function, and creates a closure around the
serialized function that only contains these variables.

The cloudpickle approach results in larger serialized functions, but it's
also much more robust, because the required globals are included by
default. In contrast, with dill, one either needs to save *all *globals or
none. This is repeated pain-point for Beam Python users [3]:
- Saving all globals can be overly aggressive, particularly in notebooks
where users may have incidentally created large objects.
- Alternatively, users can avoid using global variables entirely, but this
makes defining ad-hoc pipelines very awkward. Mapped over functions need to
be imported from other modules, or need to have their imports defined
inside the function itself.

I'd love to see an option to use cloudpickle in Beam instead of dill, and
to consider switching over entirely. Cloudpickle would allow Beam users to
write readable code in the way they expect, without needing to worry about
the confusing and potentially problematic "save_main_session" flag.

Any thoughts?

Cheers,
Stephan

[1] https://github.com/cloudpipe/cloudpickle
[2] https://github.com/uqfoundation/dill
[3]
https://cloud.google.com/dataflow/docs/resources/faq#how_do_i_handle_nameerrors