Re: Ordered PCollections eventually?

2021-05-10 Thread Sam Rohde
Awesome, thanks Pablo!

On Mon, May 10, 2021 at 4:05 PM Pablo Estrada  wrote:

> CDC would also benefit. I am working on a proposal for this that is
> concerned with streaming pipelines, and per-key ordered delivery. I will
> share with you as soon as I have a draft.
> Best
> -P.
>
> On Mon, May 10, 2021 at 2:56 PM Reuven Lax  wrote:
>
>> There has been talk, but nothing concrete.
>>
>> On Mon, May 10, 2021 at 1:42 PM Sam Rohde  wrote:
>>
>>> Hi All,
>>>
>>> I was wondering if there had been any plans for creating ordered
>>> PCollections in the Beam model? Or if there might be plans for them in the
>>> future?
>>>
>>> I know that Beam SQL and Beam DataFrames would directly benefit from an
>>> ordered PCollection.
>>>
>>> Regards,
>>> Sam
>>>
>>


Ordered PCollections eventually?

2021-05-10 Thread Sam Rohde
Hi All,

I was wondering if there had been any plans for creating ordered
PCollections in the Beam model? Or if there might be plans for them in the
future?

I know that Beam SQL and Beam DataFrames would directly benefit from an
ordered PCollection.

Regards,
Sam


Re: Making preview (sample) time consistent on Direct runner

2021-01-05 Thread Sam Rohde
Hi Ismael,

Those are good points. Do you know if the Interactive Runner has been tried
in those instances? If so, what were the shortcomings?

I can also see the use of sampling for a performance benchmarking reason.
We have seen others send in known elements which are tracked throughout the
pipeline to generate timings for each transform/stage.

-Sam

On Fri, Dec 18, 2020 at 8:24 AM Ismaël Mejía  wrote:

> Hello,
>
> The use of direct runner for interactive local use cases has increased
> with the years on Beam due to projects like Scio, Kettle/Hop and our
> own SQL CLI. All these tools have in common one thing, they show a
> sample of some source input to the user and interactively apply
> transforms to it to help users build Pipelines more rapidly.
>
> If you build a pipeline today to produce this sample using the Beam’s
> Sample transform from a set of files, the read of the files happens
> first and then the sample, so the more files or the bigger they are
> the longer it takes to produce the sample even if the number of
> elements expected to read is constant.
>
> During Beam Summit last year there were some discussions about how we
> could improve this scenario (and others) but I have the impression no
> further discussions happened in the mailing list, so I wanted to know
> if there are some ideas about how we can get direct runner to improve
> this case.
>
> It seems to me that we can still ‘force’ the count with some static
> field because it is not a distributed case but I don’t know how we can
> stop reading once we have the number of sampled elements in a generic
> way, specially now it seems to me a bit harder to do with pure DoFn
> (SDF) APIs vs old Source ones, but well that’s just a guess.
>
> Does anyone have an idea of how could we generalize this and of course
> if you see the value of such use case, other ideas for improvements?
>
> Regards,
> Ismaël
>


Docker Development Environment

2020-11-25 Thread Sam Rohde
Hi All,

I got tired of my local dev environment being ruined by updates so I made a
container for Apache Beam development work. What this does is create a
Docker container from the Ubuntu Groovy image and load it up with all the
necessary libraries/utilities for Apache Beam development. Then I run an
interactive shell in the Docker container where I do my work.

This is a nice way for new contributors to easily get started. However with
the container in its current form, I don't know if this will help other
people because it is tied closely with my workflow (using VIM,
YouCompleteMe, for Python). But I think it can be a nice starting point for
improvements. For example:

   - Sharing the host display with Docker to start GUI applications (like
   IntelliJ) in the container
   - Adding Golang development support

Here's a draft PR , let me know
what you think, how it can be improved, and whether it's a good idea for us
to have a dev container like this.

Regards,
Sam


Re: Unable to run python formater (Are the instructions out of date?)

2020-11-02 Thread Sam Rohde
I personally run `tox -e py37-lint` and `tox -e py3-yapf` from the
root/sdks/python directory and that catches most stuff. If you are adding
type annotations then also running `tox -e py37-mypy` is a good choice.
Note that tox supports tab completion, so you can see all the different
options by double-pressing tab with `tox -e` in the root/sdks/python
directory.

On Wed, Oct 28, 2020 at 8:52 PM Alex Amato  wrote:

> Thanks Chad, this was helpful. :)
>
> Btw, I think this helps my PR format somewhat, but some more checks are
> ru, not covered by this tool when I push the PR.
>
> My PR is running more checks under
> *:sdks:python:test-suites:tox:py37:mypyPy37*
>
> I am curious if anyone knows a good command line to try before pushing PRs
> to catch these issues locally first? (I had one in the past, but I think
> its outdated).
>
>
>
> On Wed, Oct 28, 2020 at 8:41 PM Pablo Estrada  wrote:
>
>> woah I didn't know about this tool at all Chad. It looks nice : )
>> FWIW, if you feel up to it, I've given you edit access to the Beam wiki (
>> https://cwiki.apache.org/confluence/display/BEAM) in case you'd like to
>> add the tip.
>> Thanks!
>> -P.
>>
>> On Wed, Oct 28, 2020 at 8:09 PM Chad Dombrova  wrote:
>>
>>> I would like to edit it!  I have an apache account and I am a committed
>>> but IIRC I could not edit it with my normal credentials.
>>>
>>>
>>> On Wed, Oct 28, 2020 at 8:02 PM Robert Burke  wrote:
>>>
 (it's a wiki, so anyone who requests and account can improve it)

 On Wed, Oct 28, 2020, 7:45 PM Chad Dombrova  wrote:

> It’s unfortunate that those instructions don’t include pre-commit,
> which is by far the easiest way to do this.
>
> To set it up:
>
> pip install pre-commit
> pre-commit install
>
> Install sets up git pre-commit hooks so that it will run yapf and
> pylint on changed files every time you commit (you’ll need python3.7. I
> think it should be possible to loosen this, as this has been an annoyance
> for me)
>
> To skip running the check on commit add -n:
>
> git commit -nm "blah blah"
>
> Alternatively, to run the check manually on changed files (pre-commit
> install is not required to run it this way):
>
> pre-commit run yapf
>
> Or on all files:
>
> pre-commit run -a yapf
>
> More info here: https://pre-commit.com/#config-language_version
>
> On Wed, Oct 28, 2020 at 6:46 PM Alex Amato  wrote:
>
>> I tried both the tox and yapf instructions on the python tips page
>> .
>> And the gradle target which failed on PR precommit. I am wondering if 
>> there
>> is something additional I need to setup?
>>
>> Here is the output from all three attempts approaches I attempted.
>> Any ideas how to get this working?
>>
>> *(ajamato_env2) ajamato@ajamato-linux0:~/beam/sdks/python$ git diff
>> --name-only --relative bigquery_python_sdk origin/master | xargs yapf
>> --in-place*
>> Traceback (most recent call last):
>>   File "/usr/local/google/home/ajamato/.local/bin/yapf", line 8, in
>> 
>> sys.exit(run_main())
>>   File
>> "/usr/local/google/home/ajamato/.local/lib/python2.7/site-packages/yapf/__init__.py",
>> line 365, in run_main
>> sys.exit(main(sys.argv))
>>   File
>> "/usr/local/google/home/ajamato/.local/lib/python2.7/site-packages/yapf/__init__.py",
>> line 135, in main
>> verbose=args.verbose)
>>   File
>> "/usr/local/google/home/ajamato/.local/lib/python2.7/site-packages/yapf/__init__.py",
>> line 204, in FormatFiles
>> in_place, print_diff, verify, quiet, verbose)
>>   File
>> "/usr/local/google/home/ajamato/.local/lib/python2.7/site-packages/yapf/__init__.py",
>> line 233, in _FormatFile
>> logger=logging.warning)
>>   File
>> "/usr/local/google/home/ajamato/.local/lib/python2.7/site-packages/yapf/yapflib/yapf_api.py",
>> line 100, in FormatFile
>> verify=verify)
>>   File
>> "/usr/local/google/home/ajamato/.local/lib/python2.7/site-packages/yapf/yapflib/yapf_api.py",
>> line 147, in FormatCode
>> tree = pytree_utils.ParseCodeToTree(unformatted_source)
>>   File
>> "/usr/local/google/home/ajamato/.local/lib/python2.7/site-packages/yapf/yapflib/pytree_utils.py",
>> line 127, in ParseCodeToTree
>> raise e
>>   File "apache_beam/metrics/execution.pxd", line 18
>> cimport cython
>>  ^
>> SyntaxError: invalid syntax
>>
>> *(ajamato_env2) ajamato@ajamato-linux0:~/beam/sdks/python$ tox -e
>> py3-yapf*
>> GLOB sdist-make:
>> /usr/local/google/home/ajamato/beam/sdks/python/setup.py
>> py3-yapf create:
>> /usr/local/google/home/ajamato/beam/sdks/python/target/.tox/py3-yapf
>> ERROR: invocation failed (exit code 1), 

Re: Proposal: ToStringFn

2020-10-28 Thread Sam Rohde
done!

On Wed, Oct 28, 2020 at 3:54 PM Tyson Hamilton  wrote:

> Can you open up comment access please?
>
> On Wed, Oct 28, 2020 at 3:40 PM Sam Rohde  wrote:
>
>> +Lukasz Cwik 
>>
>> On Tue, Oct 27, 2020 at 12:04 PM Sam Rohde  wrote:
>>
>>> Hi All,
>>>
>>> I'm working on a project in Dataflow that requires the runner to
>>> translate an element to a human-readable form. To do this, I want to add a
>>> new well-known transform that allows any runner to ask the SDK to stringify
>>> (human-readable) an element. Let me know what you think, you can find
>>> the proposed specification and implementation details here
>>> <https://docs.google.com/document/d/1v7iWj0LIum04mYwRM_Cvze915tATwmEzLrqj_uVBkCE/edit?usp=sharing>
>>> .
>>>
>>> If there are no objections, I want to start implementation as soon as I
>>> can.
>>>
>>> Regards,
>>> Sam
>>>
>>


Re: Proposal: ToStringFn

2020-10-28 Thread Sam Rohde
+Lukasz Cwik 

On Tue, Oct 27, 2020 at 12:04 PM Sam Rohde  wrote:

> Hi All,
>
> I'm working on a project in Dataflow that requires the runner to translate
> an element to a human-readable form. To do this, I want to add a new
> well-known transform that allows any runner to ask the SDK to stringify
> (human-readable) an element. Let me know what you think, you can find
> the proposed specification and implementation details here
> <https://docs.google.com/document/d/1v7iWj0LIum04mYwRM_Cvze915tATwmEzLrqj_uVBkCE/edit?usp=sharing>
> .
>
> If there are no objections, I want to start implementation as soon as I
> can.
>
> Regards,
> Sam
>


Re: More metadata in Coder Proto

2020-05-20 Thread Sam Rohde
+Robert Bradshaw  who is the reviewer on
https://github.com/apache/beam/pull/11503. How does that sound to you? Skip
the "is input deterministic" check for GBKs embedded in x-lang transforms?

On Wed, May 20, 2020 at 10:56 AM Sam Rohde  wrote:

> Thanks for your comments, here's a little more to the problem I'm working
> on: I have a PR to make GBK a primitive
> <https://github.com/apache/beam/pull/11503> and the aforementioned
> test_combine_globally was check failing in the run_pipeline method of the
> DataflowRunner.
> Specifically what is failing is when the DataflowRunner visits each
> transform, it checks if the GBK has a deterministic input coder. This fails
> when the GBK is expanded from the expansion service because the resulting
> ExternalCoder doesn't override the is_deterministic method.
>
> This wasn't being hit before because this deterministic input check only
> occurred during the apply_GroupByKey method. However, I moved it to when
> the DataflowRunner is creating a V1B3 pipeline during the run_pipeline
> stage.
>
>
> On Wed, May 20, 2020 at 10:13 AM Luke Cwik  wrote:
>
>> If the CombineGlobally is being returned by the expansion service, the
>> expansion service is on the hook for ensuring that intermediate
>> PCollections/PTransforms/... are constructed correctly.
>>
> Okay, this was kind of my hunch. If the DataflowRunner is making sure that
> the input coder to a GBK is deterministic, then we should skip the check if
> we receive an x-lang transform (seen in the Python SDK as a
> RunnerAPITransformHolder).
>
>
>>
>> I thought this question was about what to do if you want to take the
>> output of an XLang pipeline and process it through some generic transform
>> that doesn't care about the types and treats it like an opaque blob (like
>> the Count transform) and how to make that work when you don't know the
>> output properties. I don't think anyone has shared a design doc for this
>> problem that covered the different approaches.
>>
> Aside from the DataflowRunner GBK problem, I was also curious if there was
> any need for metadata around the Coder proto and why there currently is no
> metadata. If there was more metadata, like an is_deterministic field, then
> the GBK deterministic input check could also work.
>
>
>
>>
>> On Tue, May 19, 2020 at 9:47 PM Chamikara Jayalath 
>> wrote:
>>
>>> I think you are hitting GroupByKey [1] that is internal to the Java
>>> CombineGlobally implementation that takes a KV with a Void type (with
>>> VoidCoder) [2] as input.
>>>
>>> ExternalCoder was added to Python SDK to represent coders within
>>> external transforms that are not standard coders (in this case the
>>> VoidCoder). This is needed to perform the "pipeline proto -> Python object
>>> graph -> Dataflow job request" conversion.
>>>
>>> Seems like today, a runner is unable to perform this particular
>>> validation (and maybe others ?) for pipeline segments received through a
>>> cross-language transform expansion with or without the ExternalCoder. Note
>>> that a runner is not involved during cross-language transform expansion, so
>>> pipeline submission is the only location where a runner would get a chance
>>> to perform this kind of validation for cross-language transforms.
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/2967e3ae513a9bdb13c2da8ffa306fdc092370f0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1596
>>> [2]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1172
>>>
>>> On Tue, May 19, 2020 at 8:31 PM Luke Cwik  wrote:
>>>
>>>> Since combine globally is a case where you don't need to know what the
>>>> key or value is and could treat them as bytes allowing you to build and
>>>> execute this pipeline (assuming you ignored properties such as
>>>> is_deterministic).
>>>>
>>>> Regardless, I still think it makes sense to provide criteria on what
>>>> your output shape must be during xlang pipeline expansion which is yet to
>>>> be defined to support such a case. Your suggested solution of adding
>>>> properties to coders is one possible solution but I think we have to take a
>>>> step back and consider xlang as a whole since there are still several yet
>>>> to be solved issues within it.
>>>>
>>>>
>>>> On Tue, May 19, 2020 at 4:56 PM Sam Rohde  wrote:
>>>>
>>>>> I h

Re: More metadata in Coder Proto

2020-05-20 Thread Sam Rohde
Thanks for your comments, here's a little more to the problem I'm working
on: I have a PR to make GBK a primitive
<https://github.com/apache/beam/pull/11503> and the aforementioned
test_combine_globally was check failing in the run_pipeline method of the
DataflowRunner.
Specifically what is failing is when the DataflowRunner visits each
transform, it checks if the GBK has a deterministic input coder. This fails
when the GBK is expanded from the expansion service because the resulting
ExternalCoder doesn't override the is_deterministic method.

This wasn't being hit before because this deterministic input check only
occurred during the apply_GroupByKey method. However, I moved it to when
the DataflowRunner is creating a V1B3 pipeline during the run_pipeline
stage.


On Wed, May 20, 2020 at 10:13 AM Luke Cwik  wrote:

> If the CombineGlobally is being returned by the expansion service, the
> expansion service is on the hook for ensuring that intermediate
> PCollections/PTransforms/... are constructed correctly.
>
Okay, this was kind of my hunch. If the DataflowRunner is making sure that
the input coder to a GBK is deterministic, then we should skip the check if
we receive an x-lang transform (seen in the Python SDK as a
RunnerAPITransformHolder).


>
> I thought this question was about what to do if you want to take the
> output of an XLang pipeline and process it through some generic transform
> that doesn't care about the types and treats it like an opaque blob (like
> the Count transform) and how to make that work when you don't know the
> output properties. I don't think anyone has shared a design doc for this
> problem that covered the different approaches.
>
Aside from the DataflowRunner GBK problem, I was also curious if there was
any need for metadata around the Coder proto and why there currently is no
metadata. If there was more metadata, like an is_deterministic field, then
the GBK deterministic input check could also work.



>
> On Tue, May 19, 2020 at 9:47 PM Chamikara Jayalath 
> wrote:
>
>> I think you are hitting GroupByKey [1] that is internal to the Java
>> CombineGlobally implementation that takes a KV with a Void type (with
>> VoidCoder) [2] as input.
>>
>> ExternalCoder was added to Python SDK to represent coders within external
>> transforms that are not standard coders (in this case the VoidCoder). This
>> is needed to perform the "pipeline proto -> Python object graph -> Dataflow
>> job request" conversion.
>>
>> Seems like today, a runner is unable to perform this particular
>> validation (and maybe others ?) for pipeline segments received through a
>> cross-language transform expansion with or without the ExternalCoder. Note
>> that a runner is not involved during cross-language transform expansion, so
>> pipeline submission is the only location where a runner would get a chance
>> to perform this kind of validation for cross-language transforms.
>>
>> [1]
>> https://github.com/apache/beam/blob/2967e3ae513a9bdb13c2da8ffa306fdc092370f0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1596
>> [2]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1172
>>
>> On Tue, May 19, 2020 at 8:31 PM Luke Cwik  wrote:
>>
>>> Since combine globally is a case where you don't need to know what the
>>> key or value is and could treat them as bytes allowing you to build and
>>> execute this pipeline (assuming you ignored properties such as
>>> is_deterministic).
>>>
>>> Regardless, I still think it makes sense to provide criteria on what
>>> your output shape must be during xlang pipeline expansion which is yet to
>>> be defined to support such a case. Your suggested solution of adding
>>> properties to coders is one possible solution but I think we have to take a
>>> step back and consider xlang as a whole since there are still several yet
>>> to be solved issues within it.
>>>
>>>
>>> On Tue, May 19, 2020 at 4:56 PM Sam Rohde  wrote:
>>>
>>>> I have a PR that makes GBK a primitive in which the
>>>> test_combine_globally
>>>> <https://github.com/apache/beam/blob/10dc1bb683aa9c219397cb3474b676a4fbac5a0e/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py#L162>
>>>> is failing on the DataflowRunner. In particular, the DataflowRunner runs
>>>> over the transform in the run_pipeline method. I moved a method that
>>>> verifies that coders as inputs to GBKs are deterministic during this
>>>> run_pipeline. Previously, this was during the apply_GroupByKey.
>>&g

Re: More metadata in Coder Proto

2020-05-19 Thread Sam Rohde
I have a PR that makes GBK a primitive in which the test_combine_globally
<https://github.com/apache/beam/blob/10dc1bb683aa9c219397cb3474b676a4fbac5a0e/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py#L162>
is failing on the DataflowRunner. In particular, the DataflowRunner runs
over the transform in the run_pipeline method. I moved a method that
verifies that coders as inputs to GBKs are deterministic during this
run_pipeline. Previously, this was during the apply_GroupByKey.

On Tue, May 19, 2020 at 4:48 PM Brian Hulette  wrote:

> Yes I'm unclear on how a PCollection with ExternalCoder made it into a
> downstream transform that enforces is_deterministic. My understanding of
> ExternalCoder (admittedly just based on a quick look at commit history) is
> that it's a shim added so the Python SDK can handle coders that are
> internal to cross-language transforms.
> I think that if the Python SDK is trying to introspect an ExternalCoder
> instance then something is wrong.
>
> Brian
>
> On Tue, May 19, 2020 at 4:01 PM Luke Cwik  wrote:
>
>> I see. The problem is that you are trying to know certain properties of
>> the coder to use in a downstream transform which enforces that it is
>> deterministic like GroupByKey.
>>
>> In all the scenarios so far that I have seen we have required both SDKs
>> to understand the coder, how are you having a cross language pipeline where
>> the downstream SDK doesn't understand the coder and works?
>>
>> Also, an alternative strategy would be to tell the expansion service that
>> you need to choose a coder that is deterministic on the output. This would
>> require building the pipeline and before submission to the job server
>> perform the expansion telling it all the limitations that the SDK has
>> imposed on it.
>>
>>
>>
>>
>> On Tue, May 19, 2020 at 3:45 PM Sam Rohde  wrote:
>>
>>> Hi all,
>>>
>>> Should there be more metadata in the Coder Proto? For example, adding an
>>> "is_deterministic" boolean field. This will allow for a language-agnostic
>>> way for SDKs to infer properties about a coder received from the expansion
>>> service.
>>>
>>> My motivation for this is that I recently ran into a problem in which an
>>> "ExternalCoder" in the Python SDK was erroneously marked as
>>> non-deterministic. The reason being is that the Coder proto doesn't have an
>>> "is_deterministic" and when the coder fails to be recreated in Python, the
>>> ExternalCoder defaults to False.
>>>
>>> Regards,
>>> Sam
>>>
>>>


More metadata in Coder Proto

2020-05-19 Thread Sam Rohde
Hi all,

Should there be more metadata in the Coder Proto? For example, adding an
"is_deterministic" boolean field. This will allow for a language-agnostic
way for SDKs to infer properties about a coder received from the expansion
service.

My motivation for this is that I recently ran into a problem in which an
"ExternalCoder" in the Python SDK was erroneously marked as
non-deterministic. The reason being is that the Coder proto doesn't have an
"is_deterministic" and when the coder fails to be recreated in Python, the
ExternalCoder defaults to False.

Regards,
Sam


Re: [BEAM-9322] Python SDK discussion on correct output tag names

2020-04-01 Thread Sam Rohde
On Wed, Apr 1, 2020 at 3:34 PM Robert Bradshaw  wrote:

> E.g. something like https://github.com/apache/beam/pull/11283
>
> That looks good. Though it may be better to do the change under the
"passthrough_pcollection_output_ids" experiment flag above it (I'll comment
on this in the PR too).


> On Wed, Apr 1, 2020 at 2:57 PM Robert Bradshaw 
> wrote:
>
>> On Wed, Apr 1, 2020 at 1:48 PM Sam Rohde  wrote:
>>
>>> To restate the original issue it is that the current method of setting
>>> the output tags on PCollections from composites drops the tag information
>>> of the returned PCollections.
>>>
>>
>> Composite PTransforms should *not* be setting output tags on
>> returned PCollecitons; this will break producing outputs from the actual
>> primitive that produces them.
>>
>>
>>> So an expand returning a dict will have its outputs labeled as None, 1,
>>> ..., len(outputs). This is broken because embedded payloads in composites
>>> won't be able to reference the outputs if they differ from the returned
>>> value.
>>>
>>
>> Yes, we need a way for composites to declare their output tags. Currently
>> this is only supported for the multi-output ParDo primitive.
>>
>
>>
>>> In this case, having the restriction of no nesting decreases technical
>>> complexity substantially and always giving the tag unambiguously informs
>>> the SDK what to name the outputs. We can also allow for arbitrary nesting,
>>> we'll just have to figure out an unambiguous naming scheme for the
>>> PCollections.
>>>
>>
>> How about this: if the returned PValue is a dictionary of string ->
>> PCollection, we use the keys as tags. We can extend this naturally to
>> tuples, named tuples, nesting, etc. (though I don't know if there are any
>> hidden assumptions left about having an output labeled None if we want to
>> push this through to completion).
>>
>>
> Sorry for my imperfect vocabulary, this is what I was roughly trying (and
failed) to propose.



>
>>>
>>>
>>> On Wed, Apr 1, 2020 at 12:44 PM Robert Bradshaw 
>>> wrote:
>>>
>>>> I'm -1 on this, it adds additional restrictions and I don't see what
>>>> this buys us. (In particular, it doesn't address the original issue.)
>>>>
>>>> On Wed, Apr 1, 2020 at 12:41 PM Sam Rohde  wrote:
>>>>
>>>>> So then how does the proposal sound?
>>>>>
>>>>> Writing again here:
>>>>> PTransform.expand: (...) -> Union[PValue, NamedTuple[str,
>>>>> PCollection], Tuple[str, PCollection], Dict[str, PCollection],
>>>>> DoOutputsTuple]
>>>>>
>>>>> i.e. no arbitrary nesting when outputting from an expand
>>>>>
>>>>> On Tue, Mar 31, 2020 at 5:15 PM Robert Bradshaw 
>>>>> wrote:
>>>>>
>>>>>> On Tue, Mar 31, 2020 at 4:13 PM Luke Cwik  wrote:
>>>>>> >
>>>>>> > It is important that composites know how things are named so that
>>>>>> any embedded payloads in the composite PTransform can reference the 
>>>>>> outputs
>>>>>> appropriately.
>>>>>>
>>>>>> Very good point. This is part of the cleanup to treat inputs and
>>>>>> outputs of PCollections as maps rather than lists generally across the
>>>>>> Python representations (which also relates to some of the ugliness
>>>>>> that Cham has been running into with cross-language).
>>>>>>
>>>>>> > On Tue, Mar 31, 2020 at 2:51 PM Robert Bradshaw <
>>>>>> rober...@google.com> wrote:
>>>>>> >>
>>>>>> >> On Tue, Mar 31, 2020 at 1:13 PM Sam Rohde 
>>>>>> wrote:
>>>>>> >> >>>
>>>>>> >> >>> * Don't allow arbitrary nestings returned during expansion,
>>>>>> force composite transforms to always provide an unambiguous name (either 
>>>>>> a
>>>>>> tuple with PCollections with unique tags or a dictionary with untagged
>>>>>> PCollections or a singular PCollection (Java and Go SDKs do this)).
>>>>>> >> >>
>>>>>> >> >> I believe that aligning with Java and Go would be the right way
>>>>>> to go here. I don't know if this would limit expressiveness.
>>>>>>

Re: [BEAM-9322] Python SDK discussion on correct output tag names

2020-04-01 Thread Sam Rohde
To restate the original issue it is that the current method of setting the
output tags on PCollections from composites drops the tag information of
the returned PCollections. So an expand returning a dict will have its
outputs labeled as None, 1, ..., len(outputs). This is broken because
embedded payloads in composites won't be able to reference the outputs if
they differ from the returned value.

In this case, having the restriction of no nesting decreases technical
complexity substantially and always giving the tag unambiguously informs
the SDK what to name the outputs. We can also allow for arbitrary nesting,
we'll just have to figure out an unambiguous naming scheme for the
PCollections.



On Wed, Apr 1, 2020 at 12:44 PM Robert Bradshaw  wrote:

> I'm -1 on this, it adds additional restrictions and I don't see what this
> buys us. (In particular, it doesn't address the original issue.)
>
> On Wed, Apr 1, 2020 at 12:41 PM Sam Rohde  wrote:
>
>> So then how does the proposal sound?
>>
>> Writing again here:
>> PTransform.expand: (...) -> Union[PValue, NamedTuple[str, PCollection],
>> Tuple[str, PCollection], Dict[str, PCollection], DoOutputsTuple]
>>
>> i.e. no arbitrary nesting when outputting from an expand
>>
>> On Tue, Mar 31, 2020 at 5:15 PM Robert Bradshaw 
>> wrote:
>>
>>> On Tue, Mar 31, 2020 at 4:13 PM Luke Cwik  wrote:
>>> >
>>> > It is important that composites know how things are named so that any
>>> embedded payloads in the composite PTransform can reference the outputs
>>> appropriately.
>>>
>>> Very good point. This is part of the cleanup to treat inputs and
>>> outputs of PCollections as maps rather than lists generally across the
>>> Python representations (which also relates to some of the ugliness
>>> that Cham has been running into with cross-language).
>>>
>>> > On Tue, Mar 31, 2020 at 2:51 PM Robert Bradshaw 
>>> wrote:
>>> >>
>>> >> On Tue, Mar 31, 2020 at 1:13 PM Sam Rohde  wrote:
>>> >> >>>
>>> >> >>> * Don't allow arbitrary nestings returned during expansion, force
>>> composite transforms to always provide an unambiguous name (either a tuple
>>> with PCollections with unique tags or a dictionary with untagged
>>> PCollections or a singular PCollection (Java and Go SDKs do this)).
>>> >> >>
>>> >> >> I believe that aligning with Java and Go would be the right way to
>>> go here. I don't know if this would limit expressiveness.
>>> >> >
>>> >> > Yeah this sounds like a much more elegant way of handling this
>>> situation. I would lean towards this limiting expressiveness because there
>>> would be a limit to nesting, but I think that the trade-off with reducing
>>> complexity is worth it.
>>> >> >
>>> >> > So in summary it could be:
>>> >> > PTransform.expand: (...) -> Union[PValue, NamedTuple[str,
>>> PCollection], Tuple[str, PCollection], Dict[str, PCollection],
>>> DoOutputsTuple]
>>> >> >
>>> >> > With the expectation that (pseudo-code):
>>> >> > a_transform = ATransform()
>>> >> >
>>> ATransform.from_runner_api(a_transform.to_runner_api()).outputs.keys() ==
>>> a_transform.outputs.keys()
>>> >> >
>>> >> > Since this changes the Python SDK composite transform API, what
>>> would be the next steps for the community to come to a consensus on this?
>>> >>
>>> >> It seems here we're conflating the nesting of PValue results with the
>>> >> nesting of composite operations.
>>> >>
>>> >> Both examples in the original post have PTransform nesting (a
>>> >> composite) returning a flat tuple. This is completely orthogonal to
>>> >> the idea of a PTransform returning a nested result (such as (pc1,
>>> >> (pc2, pc3))) and forbidding the latter won't solve the former.
>>> >>
>>> >> Currently, with the exception of explicit names given for multi-output
>>> >> ParDos, we simply label the outputs sequentially with 0, 1, 2, 3, ...
>>> >> (Actually, for historical reasons, it's None, 1, 2, 3, ...), no matter
>>> >> the nesting. We could do better, e.g. for the example above, label
>>> >> them "0", "1.0", "1.1", or use the keys in the returned dict, but this
>>> >> is separate from the idea of trying to relate the output tags of
>>> >> composites to the output tags of their inner transforms.
>>> >>
>>> >> - Robert
>>>
>>


Re: Unportable Dataflow Pipeline Questions

2020-04-01 Thread Sam Rohde
Okay cool, so it sounds like the cleanup can be done in two phases: move
the apply_ methods to transform replacements, then move Dataflow onto the
Cloudv1b3 protos. AFAIU, after phase one will make the Pipeline object
portable? If the InteractiveRunner were to make a Pipeline object, then it
could be passed to the DataflowRunner to run, correct?

On Tue, Mar 31, 2020 at 6:01 PM Robert Burke  wrote:

> +1 to translation from beam pipeline Protos.
>
>  The Go SDK does that currently in dataflowlib/translate.go to handle the
> current Dataflow situation, so it's certainly doable.
>
> On Tue, Mar 31, 2020, 5:48 PM Robert Bradshaw  wrote:
>
>> On Tue, Mar 31, 2020 at 12:06 PM Sam Rohde  wrote:
>>
>>> Hi All,
>>>
>>> I am currently investigating making the Python DataflowRunner to use a
>>> portable pipeline representation so that we can eventually get rid of the
>>> Pipeline(runner) weirdness.
>>>
>>> In that case, I have a lot questions about the Python DataflowRunner:
>>>
>>> *PValueCache*
>>>
>>>- Why does this exist?
>>>
>>> This is historical baggage from the (long gone) first direct runner when
>> actual computed PCollections were cached, and the DataflowRunner inherited
>> it.
>>
>>
>>> *DataflowRunner*
>>>
>>>- I see that the DataflowRunner defines some PTransforms as
>>>runner-specific primitives by returning a PCollection.from_(...) in 
>>> apply_
>>>methods. Then in the run_ methods, it references the PValueCache to add
>>>steps.
>>>   - How does this add steps?
>>>   - Where does it cache the values to?
>>>   - How does the runner harness pick up these cached values to
>>>   create new steps?
>>>   - How is this information communicated to the runner harness?
>>>- Why do the following transforms need to be overridden: GroupByKey,
>>>WriteToBigQuery, CombineValues, Read?
>>>
>>> Each of these four has a different implementation on Dataflow.
>>
>>>
>>>- Why doesn't the ParDo transform need to be overridden? I see that
>>>it has a run_ method but no apply_ method.
>>>
>>> apply_ is called at pipeline construction time, all of these should be
>> replaced by PTransformOverrides. run_ is called after pipeline construction
>> to actually build up the dataflow graph.
>>
>>
>>> *Possible fixes*
>>> I was thinking of getting rid of the apply_ and run_ methods and
>>> replacing those with a PTransformOverride and a simple PipelineVisitor,
>>> respectively. Is this feasible? Am I missing any assumptions that don't
>>> make this feasible?
>>>
>>
>> If we're going to overhaul how the runner works, it would be best to make
>> DataflowRunner direct a translator from Beam runner api protos to Cloudv1b3
>> protos, rather than manipulate the intermediate Python representation
>> (which no one wants to change for fear of messing up DataflowRunner and
>> cause headaches for cross langauge).
>>
>>
>>


Re: [BEAM-9322] Python SDK discussion on correct output tag names

2020-03-31 Thread Sam Rohde
>
> * Don't allow arbitrary nestings returned during expansion, force
>> composite transforms to always provide an unambiguous name (either a tuple
>> with PCollections with unique tags or a dictionary with untagged
>> PCollections or a singular PCollection (Java and Go SDKs do this)).
>>
>
> I believe that aligning with Java and Go would be the right way to go
> here. I don't know if this would limit expressiveness.
>
Yeah this sounds like a much more elegant way of handling this situation. I
would lean towards this limiting expressiveness because there would be a
limit to nesting, but I think that the trade-off with reducing complexity
is worth it.

So in summary it could be:
PTransform.expand: (...) -> Union[PValue, NamedTuple[str, PCollection],
Tuple[str, PCollection], Dict[str, PCollection], DoOutputsTuple]

With the expectation that (pseudo-code):
a_transform = ATransform()
ATransform.from_runner_api(a_transform.to_runner_api()).outputs.keys() ==
a_transform.outputs.keys()

Since this changes the Python SDK composite transform API, what would be
the next steps for the community to come to a consensus on this?

-Sam


On Thu, Mar 26, 2020 at 12:52 PM Udi Meiri  wrote:

>
>
> On Thu, Mar 26, 2020 at 10:13 AM Luke Cwik  wrote:
>
>> The issue seems to be that a PCollection can have a "tag" associated with
>> it and PTransform expansion can return an arbitrary nested dictionary/tuple
>> yet we need to figure out what the user wanted as the local name for the
>> PCollection from all this information.
>>
>> Will this break people who rely on the generated PCollection output tags?
>> One big question is whether a composite transform cares about the name
>> that is used. For primitive transforms such as ParDo, this is very much a
>> yes because the pickled code likely references that name in some way. Some
>> composites could have the same need where the payload that is stored as
>> part of the composite references these local names and hence we have to
>> tell people how to instruct the SDK during transform expansion about what
>> name will be used unambiguously (as long as we document and have tests
>> around this we can choose from many options). Finally, in the XLang world,
>> we need to preserve the names that were provided to us and not change them;
>> which is more about making the Python SDK handle XLang transform expansion
>> carefully.
>>
>> Am I missing edge cases?
>> Concatenation of strings leads to collisions if the delimiter character
>> is used within the tags or map keys. You could use an escaping encoding to
>> guarantee that the concatenation always generates unique names.
>>
>> Some alternatives I thought about were:
>> * Don't allow arbitrary nestings returned during expansion, force
>> composite transforms to always provide an unambiguous name (either a tuple
>> with PCollections with unique tags or a dictionary with untagged
>> PCollections or a singular PCollection (Java and Go SDKs do this)).
>>
>
> I believe that aligning with Java and Go would be the right way to go
> here. I don't know if this would limit expressiveness.
>
>
>> * Have a "best" effort naming system (note the example I give can have
>> many of the "rules" re-ordered) e.g. if all the PCollection tags are unique
>> then use only them, followed by if a flat dictionary is returned then use
>> only the keys as names, followed by if a flat tuple is returned then use
>> indices, and finally fallback to the hierarchical naming scheme.
>>
>>
>> On Tue, Mar 24, 2020 at 1:07 PM Sam Rohde  wrote:
>>
>>> Hi All,
>>>
>>> *Problem*
>>> I would like to discuss BEAM-9322
>>> <https://issues.apache.org/jira/projects/BEAM/issues/BEAM-9322> and the
>>> correct way to set the output tags of a transform with nested PCollections,
>>> e.g. a dict of PCollections, a tuple of dicts of PCollections. Before the
>>> fixing of BEAM-1833 <https://issues.apache.org/jira/browse/BEAM-1833>,
>>> the Python SDK when applying a PTransform would auto-generate the output
>>> tags for the output PCollections even if they are manually set by the user:
>>>
>>> class MyComposite(beam.PTransform):
>>>   def expand(self, pcoll):
>>> a = PCollection.from_(pcoll)
>>> a.tag = 'a'
>>>
>>> b = PCollection.from_(pcoll)
>>> b.tag = 'b'
>>> return (a, b)
>>>
>>> would yield a PTransform with two output PCollection and output tags
>>> with 'None' and '0' instead of 'a' and 'b'. This was corrected for simple
>>&

Unportable Dataflow Pipeline Questions

2020-03-31 Thread Sam Rohde
Hi All,

I am currently investigating making the Python DataflowRunner to use a
portable pipeline representation so that we can eventually get rid of the
Pipeline(runner) weirdness.

In that case, I have a lot questions about the Python DataflowRunner:

*PValueCache*

   - Why does this exist?

*DataflowRunner*

   - I see that the DataflowRunner defines some PTransforms as
   runner-specific primitives by returning a PCollection.from_(...) in apply_
   methods. Then in the run_ methods, it references the PValueCache to add
   steps.
  - How does this add steps?
  - Where does it cache the values to?
  - How does the runner harness pick up these cached values to create
  new steps?
  - How is this information communicated to the runner harness?
   - Why do the following transforms need to be overridden: GroupByKey,
   WriteToBigQuery, CombineValues, Read?
   - Why doesn't the ParDo transform need to be overridden? I see that it
   has a run_ method but no apply_ method.

*Possible fixes*
I was thinking of getting rid of the apply_ and run_ methods and replacing
those with a PTransformOverride and a simple PipelineVisitor, respectively.
Is this feasible? Am I missing any assumptions that don't make this
feasible?

Regards,
Sam


[BEAM-9322] Python SDK discussion on correct output tag names

2020-03-24 Thread Sam Rohde
Hi All,

*Problem*
I would like to discuss BEAM-9322
 and the
correct way to set the output tags of a transform with nested PCollections,
e.g. a dict of PCollections, a tuple of dicts of PCollections. Before the
fixing of BEAM-1833 , the
Python SDK when applying a PTransform would auto-generate the output tags
for the output PCollections even if they are manually set by the user:

class MyComposite(beam.PTransform):
  def expand(self, pcoll):
a = PCollection.from_(pcoll)
a.tag = 'a'

b = PCollection.from_(pcoll)
b.tag = 'b'
return (a, b)

would yield a PTransform with two output PCollection and output tags with
'None' and '0' instead of 'a' and 'b'. This was corrected for simple cases
like this. However, this fails when the PCollections share the same output
tag (of course). This can happen like so:

class MyComposite(beam.PTransform):
  def expand(self, pcoll):
partition_1 = beam.Partition(pcoll, ...)
partition_2 = beam.Partition(pcoll, ...)
return (partition_1[0], partition_2[0])

With the new code, this leads to an error because both output PCollections
have an output tag of '0'.

*Proposal*
When applying PTransforms to a pipeline (pipeline.py:550) we name the
PCollections according to their position in the tree concatenated with the
PCollection tag and a delimiter. From the first example, the output
PCollections of the applied transform will be: '0.a' and '1.b' because it
is a tuple of PCollections. In the second example, the outputs should be:
'0.0' and '1.0'. In the case of a dict of PCollections, it should simply be
the keys of the dict.

What do you think? Am I missing edge cases? Will this be unexpected to
users? Will this break people who rely on the generated PCollection output
tags?

Regards,
Sam


[Interactive Runner] now available on master

2020-03-18 Thread Sam Rohde
Hi All!



I am happy to announce that an improved Interactive Runner is now available
on master. This Python runner allows for the interactive development of
Beam pipelines in a notebook (and IPython) environment.



The runner still has some bugs that need to be fixed as well as some
refactoring, but it is in a good enough shape to start using it.



Here are the new things you can do with the Interactive Runner:

   -

   Create and execute pipelines within a REPL
   -

   Visualize elements as the pipeline is running
   -

   Materialize PCollections to DataFrames
   -

   Record unbounded sources for deterministic replay
   -

   Replay cached unbounded sources including watermark advancements

The code lives in sdks/python/apache_beam/runners/interactive

and example notebooks are in
sdks/python/apache_beam/runners/interactive/examples

.



To install, use `pip install -e .[interactive]` in your /sdks/python directory.

To run, here’s a quick example:

```

import apache_beam as beam

from apache_beam.runners.interactive.interactive_runner import
InteractiveRunner

import apache_beam.runners.interactive.interactive_beam as ib



p = beam.Pipeline(InteractiveRunner())

words = p | beam.Create(['To', 'be', 'or', 'not', 'to', 'be'])

counts = words | 'count' >> beam.combiners.Count.PerElement()



# Shows a dynamically updating display of the PCollection elements

ib.show(counts)



# We can now visualize the data using standard pandas operations.

df = ib.collect(counts)

print(df.info())

print(df.describe())



# Plot the top-10 counted words

df = df.sort_values(by=1, ascending=False)

df.head(n=10).plot(x=0, y=1)

```



Currently, Batch is supported on any runner. Streaming is only supported on
the DirectRunner (non-FnAPI).



I would like to thank the great work of Sindy (@sindyli) and Harsh
(@ananvay) for the initial implementation,

David Yan (@davidyan) who led the project, Ning (@ningk) and myself
(@srohde) for the implementation and design, and Ahmet (@altay), Daniel
(@millsd), Pablo (@pabloem), and Robert (@robertwb) who all contributed a
lot of their time to help with the design and code reviews.



It was a team effort and we wouldn't have been able to complete it without
the help of everyone involved.



Regards,

Sam


Re: Time precision in Python

2020-02-06 Thread Sam Rohde
Gotcha, I was just surprised by the precision loss. Thanks!

On Thu, Feb 6, 2020 at 1:50 PM Robert Bradshaw  wrote:

> Yes, the inconsistency of timestamp granularity is something that
> hasn't yet been resolved (see previous messages on this list). As long
> as we round consistently, it won't result in out-of-order windows, but
> it may result in timestamp truncation and (for sub-millisecond small
> windows) even window identifiaction.
>
> On Thu, Feb 6, 2020 at 1:42 PM Sam Rohde  wrote:
> >
> > Hi All,
> >
> > I saw that in the Python SDK we encode WindowedValues and Timestamps as
> millis, whereas the TIME_GRANULARITY is defined as 1us. Why do we do this?
> Won't this cause problems using the FnApiRunner as windows might fire out
> of order or something else?
> >
> > Thanks,
> > Sam
>


Time precision in Python

2020-02-06 Thread Sam Rohde
Hi All,

I saw that in the Python SDK we encode WindowedValues

and Timestamps

as millis, whereas the TIME_GRANULARITY

is defined as 1us. Why do we do this? Won't this cause problems using the
FnApiRunner as windows might fire out of order or something else?

Thanks,
Sam


Python2.7 Beam End-of-Life Date

2020-02-04 Thread Sam Rohde
Hi All,

Just curious when Beam will drop support for Python 2.7? Not being able to
use all the nice features of 3.x and appeasing both 2.7 and 3.x linters is
somewhat troublesome. Not to mention that all the nice work for the type
hints will have to be redone in the for 3.x. It seems the faster we drop
support the better.

Regards,
Sam


Re: [VOTE] Beam Mascot animal choice: vote for as many as you want

2019-11-20 Thread Sam Rohde
[ ] Beaver
[ ] Hedgehog
[ ] Lemur
[ ] Owl
[ ] Salmon
[ ] Trout
[x] Robot dinosaur
[ ] Firefly
[ ] Cuttlefish
[ ] Dumbo Octopus
[ ] Angler fish

On Wed, Nov 20, 2019 at 9:22 AM Alex Amato  wrote:

> [ ] Beaver
> [ ] Hedgehog
> [ ] Lemur
> [ ] Owl
> [ ] Salmon
> [ ] Trout
> [X] Robot dinosaur
> [ ] Firefly
> [ ] Cuttlefish
> [ ] Dumbo Octopus
> [ ] Angler fish
>
>
> On Wed, Nov 20, 2019 at 9:15 AM Kirill Kozlov 
> wrote:
>
>> [ ] Beaver
>> [ ] Hedgehog
>> [X] Lemur
>> [X] Owl
>> [ ] Salmon
>> [ ] Trout
>> [ ] Robot dinosaur
>> [ ] Firefly
>> [ ] Cuttlefish
>> [ ] Dumbo Octopus
>> [X] Angler fish
>>
>>
>> On Wed, Nov 20, 2019, 08:38 Cyrus Maden  wrote:
>>
>>> Here's my vote, but I'm curious about the distinction between salmon and
>>> trout mascots :)
>>>
>>> [ ] Beaver
>>> [ ] Hedgehog
>>> [ X] Lemur
>>> [ ] Owl
>>> [ X] Salmon
>>> [ ] Trout
>>> [ ] Robot dinosaur
>>> [ X] Firefly
>>> [ ] Cuttlefish
>>> [ ] Dumbo Octopus
>>> [ X] Angler fish
>>>
>>> On Wed, Nov 20, 2019 at 11:24 AM Allan Wilson 
>>> wrote:
>>>


 On 11/20/19, 8:44 AM, "Ryan Skraba"  wrote:

 *** Vote for as many as you like, using this checklist as a
 template 

 [] Beaver
 [X] Hedgehog
 [X ] Lemur
 [ ] Owl
 [ ] Salmon
 [] Trout
 [ ] Robot dinosaur
 [ ] Firefly
 [ ] Cuttlefish
 [ ] Dumbo Octopus
 [ ] Angler fish





Re: Date/Time Ranges & Protobuf

2019-11-18 Thread Sam Rohde
Also made a JIRA:
https://issues.apache.org/jira/projects/BEAM/issues/BEAM-8738

On Mon, Nov 18, 2019 at 2:32 PM Sam Rohde  wrote:

> Cool I wrote up https://github.com/apache/beam/pull/10146
>
> On Mon, Nov 18, 2019 at 2:09 PM Luke Cwik  wrote:
>
>> Sam, I think doing that makes the most sense right now as we haven't yet
>> had a strong enough consensus to change it so to support all of Beam's
>> timestamps/durations it makes sense to still use the format but work around
>> the limitation that is imposed.
>>
>> On Mon, Nov 18, 2019 at 11:25 AM Sam Rohde  wrote:
>>
>>> Timestamp related question: I want to modify Python's utils/timestamp.py
>>> <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/timestamp.py>
>>> module to include google.protobuf.timestamp to/from translation methods.
>>> What do you guys think? Now that we know the timestamp.proto is implicitly
>>> RFC3339 compliant, is it right to include translation methods that could
>>> potentially break that compliance (a la min/max watermarks)? We already use
>>> the timestamp.proto in: windows definitions
>>> <https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto#L44>,
>>> pubsub messages
>>> <https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/pubsub.proto#L32>,
>>> bundle applications
>>> <https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L173>,
>>> metrics
>>> <https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L173>,
>>> and logs
>>> <https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L804>.
>>> Is my change okay?
>>>
>>> On Thu, Nov 14, 2019 at 3:40 PM Luke Cwik  wrote:
>>>
>>>> The timestamps flow both ways since:
>>>> * IO authors are responsible for saying what the watermark timestamp is
>>>> and stateful DoFns also allow for users to set timers in relative and
>>>> processing time domains.
>>>> * Runner authors need to understand and merge these timestamps together
>>>> to compute what the global watermark is for a PCollection.
>>>>
>>>> On Thu, Nov 14, 2019 at 3:15 PM Sam Rohde  wrote:
>>>>
>>>>> My two cents are we just need a proto representation for timestamps
>>>>> and durations that includes units. The underlying library can then
>>>>> determine what to do with it. Then further, we can have a standard across
>>>>> Beam SDKs and Runners of how to interpret the proto. Using a raw int64 for
>>>>> timestamps and durations is confusing and *very very *bug prone (as
>>>>> we have seen in the past).
>>>>>
>>>>> I don't know if this is relevant, but does Apache Beam have any
>>>>> standards surrounding leap years or seconds? If we were to make our own
>>>>> timestamp format, would we have to worry about that? Or is the timestamp
>>>>> supplied to Beam a property of the underlying system giving Beam the
>>>>> timestamp? If it is, then there may be some interop problems between
>>>>> sources.
>>>>>
>>>>> On Wed, Nov 13, 2019 at 10:35 AM Luke Cwik  wrote:
>>>>>
>>>>>> I do agree that Apache Beam can represent dates and times with
>>>>>> arbitrary precision and can do it many different ways.
>>>>>>
>>>>>> My argument has always been should around whether we restrict this
>>>>>> range to a common standard to increase interoperability across other
>>>>>> systems. For example, SQL database servers have varying degrees as to 
>>>>>> what
>>>>>> ranges they support:
>>>>>> * Oracle 10[1]: 0001-01-01 to -12-31
>>>>>> * Oracle 11g[2]: Julian era, ranging from January 1, 4712 BCE through
>>>>>> December 31,  CE (Common Era, or 'AD'). Unless BCE ('BC' in the 
>>>>>> format
>>>>>> mask)
>>>>>> * MySQL[3]: '1000-01-01 00:00:00' to '-12-31 23:59:59'
>>>>>> * Microsoft SQL:  January 1, 1753, through December 31,  for
>>>>>> datetime[4] and January 1,1 CE through December 31,  CE for 
>>>>>> datetime2[5]
>>>>>>
>>>>>> The comm

Re: Date/Time Ranges & Protobuf

2019-11-18 Thread Sam Rohde
Cool I wrote up https://github.com/apache/beam/pull/10146

On Mon, Nov 18, 2019 at 2:09 PM Luke Cwik  wrote:

> Sam, I think doing that makes the most sense right now as we haven't yet
> had a strong enough consensus to change it so to support all of Beam's
> timestamps/durations it makes sense to still use the format but work around
> the limitation that is imposed.
>
> On Mon, Nov 18, 2019 at 11:25 AM Sam Rohde  wrote:
>
>> Timestamp related question: I want to modify Python's utils/timestamp.py
>> <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/timestamp.py>
>> module to include google.protobuf.timestamp to/from translation methods.
>> What do you guys think? Now that we know the timestamp.proto is implicitly
>> RFC3339 compliant, is it right to include translation methods that could
>> potentially break that compliance (a la min/max watermarks)? We already use
>> the timestamp.proto in: windows definitions
>> <https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto#L44>,
>> pubsub messages
>> <https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/pubsub.proto#L32>,
>> bundle applications
>> <https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L173>,
>> metrics
>> <https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L173>,
>> and logs
>> <https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L804>.
>> Is my change okay?
>>
>> On Thu, Nov 14, 2019 at 3:40 PM Luke Cwik  wrote:
>>
>>> The timestamps flow both ways since:
>>> * IO authors are responsible for saying what the watermark timestamp is
>>> and stateful DoFns also allow for users to set timers in relative and
>>> processing time domains.
>>> * Runner authors need to understand and merge these timestamps together
>>> to compute what the global watermark is for a PCollection.
>>>
>>> On Thu, Nov 14, 2019 at 3:15 PM Sam Rohde  wrote:
>>>
>>>> My two cents are we just need a proto representation for timestamps and
>>>> durations that includes units. The underlying library can then determine
>>>> what to do with it. Then further, we can have a standard across Beam SDKs
>>>> and Runners of how to interpret the proto. Using a raw int64 for timestamps
>>>> and durations is confusing and *very very *bug prone (as we have seen
>>>> in the past).
>>>>
>>>> I don't know if this is relevant, but does Apache Beam have any
>>>> standards surrounding leap years or seconds? If we were to make our own
>>>> timestamp format, would we have to worry about that? Or is the timestamp
>>>> supplied to Beam a property of the underlying system giving Beam the
>>>> timestamp? If it is, then there may be some interop problems between
>>>> sources.
>>>>
>>>> On Wed, Nov 13, 2019 at 10:35 AM Luke Cwik  wrote:
>>>>
>>>>> I do agree that Apache Beam can represent dates and times with
>>>>> arbitrary precision and can do it many different ways.
>>>>>
>>>>> My argument has always been should around whether we restrict this
>>>>> range to a common standard to increase interoperability across other
>>>>> systems. For example, SQL database servers have varying degrees as to what
>>>>> ranges they support:
>>>>> * Oracle 10[1]: 0001-01-01 to -12-31
>>>>> * Oracle 11g[2]: Julian era, ranging from January 1, 4712 BCE through
>>>>> December 31,  CE (Common Era, or 'AD'). Unless BCE ('BC' in the format
>>>>> mask)
>>>>> * MySQL[3]: '1000-01-01 00:00:00' to '-12-31 23:59:59'
>>>>> * Microsoft SQL:  January 1, 1753, through December 31,  for
>>>>> datetime[4] and January 1,1 CE through December 31,  CE for 
>>>>> datetime2[5]
>>>>>
>>>>> The common case of the global window containing timestamps that are
>>>>> before and after all of these supported ranges above means that our users
>>>>> can't represent a global window within a database using its common data
>>>>> types.
>>>>>
>>>>> 1: https://docs.oracle.com/javadb/10.8.3.0/ref/rrefdttlimits.html
>>>>> 2:
>>>>> https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#

Re: Date/Time Ranges & Protobuf

2019-11-18 Thread Sam Rohde
Timestamp related question: I want to modify Python's utils/timestamp.py
<https://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/timestamp.py>
module to include google.protobuf.timestamp to/from translation methods.
What do you guys think? Now that we know the timestamp.proto is implicitly
RFC3339 compliant, is it right to include translation methods that could
potentially break that compliance (a la min/max watermarks)? We already use
the timestamp.proto in: windows definitions
<https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto#L44>,
pubsub messages
<https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/pubsub.proto#L32>,
bundle applications
<https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L173>,
metrics
<https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L173>,
and logs
<https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L804>.
Is my change okay?

On Thu, Nov 14, 2019 at 3:40 PM Luke Cwik  wrote:

> The timestamps flow both ways since:
> * IO authors are responsible for saying what the watermark timestamp is
> and stateful DoFns also allow for users to set timers in relative and
> processing time domains.
> * Runner authors need to understand and merge these timestamps together to
> compute what the global watermark is for a PCollection.
>
> On Thu, Nov 14, 2019 at 3:15 PM Sam Rohde  wrote:
>
>> My two cents are we just need a proto representation for timestamps and
>> durations that includes units. The underlying library can then determine
>> what to do with it. Then further, we can have a standard across Beam SDKs
>> and Runners of how to interpret the proto. Using a raw int64 for timestamps
>> and durations is confusing and *very very *bug prone (as we have seen in
>> the past).
>>
>> I don't know if this is relevant, but does Apache Beam have any standards
>> surrounding leap years or seconds? If we were to make our own timestamp
>> format, would we have to worry about that? Or is the timestamp supplied to
>> Beam a property of the underlying system giving Beam the timestamp? If it
>> is, then there may be some interop problems between sources.
>>
>> On Wed, Nov 13, 2019 at 10:35 AM Luke Cwik  wrote:
>>
>>> I do agree that Apache Beam can represent dates and times with arbitrary
>>> precision and can do it many different ways.
>>>
>>> My argument has always been should around whether we restrict this range
>>> to a common standard to increase interoperability across other systems. For
>>> example, SQL database servers have varying degrees as to what ranges they
>>> support:
>>> * Oracle 10[1]: 0001-01-01 to -12-31
>>> * Oracle 11g[2]: Julian era, ranging from January 1, 4712 BCE through
>>> December 31,  CE (Common Era, or 'AD'). Unless BCE ('BC' in the format
>>> mask)
>>> * MySQL[3]: '1000-01-01 00:00:00' to '-12-31 23:59:59'
>>> * Microsoft SQL:  January 1, 1753, through December 31,  for
>>> datetime[4] and January 1,1 CE through December 31,  CE for datetime2[5]
>>>
>>> The common case of the global window containing timestamps that are
>>> before and after all of these supported ranges above means that our users
>>> can't represent a global window within a database using its common data
>>> types.
>>>
>>> 1: https://docs.oracle.com/javadb/10.8.3.0/ref/rrefdttlimits.html
>>> 2:
>>> https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT413
>>> 3: https://dev.mysql.com/doc/refman/8.0/en/datetime.html
>>> 4:
>>> https://docs.microsoft.com/en-us/sql/t-sql/data-types/datetime-transact-sql?view=sql-server-ver15
>>> 5:
>>> https://docs.microsoft.com/en-us/sql/t-sql/data-types/datetime2-transact-sql?view=sql-server-ver15
>>>
>>> On Wed, Nov 13, 2019 at 3:28 AM Jan Lukavský  wrote:
>>>
>>>> Hi,
>>>>
>>>> just an idea on these related topics that appear these days - it might
>>>> help to realize, that what we actually don't need a full arithmetic on
>>>> timestamps (Beam model IMHO doesn't need to know exactly what is the exact
>>>> difference of two events). What we actually need is a slightly simplified
>>>> algebra. Given two timestamps T1 and T2 and a "duration" (a different type
>>>> from timestamp), we need operations (not 100% sure that this is exhaustive,
>>>&

Re: Date/Time Ranges & Protobuf

2019-11-14 Thread Sam Rohde
My two cents are we just need a proto representation for timestamps and
durations that includes units. The underlying library can then determine
what to do with it. Then further, we can have a standard across Beam SDKs
and Runners of how to interpret the proto. Using a raw int64 for timestamps
and durations is confusing and *very very *bug prone (as we have seen in
the past).

I don't know if this is relevant, but does Apache Beam have any standards
surrounding leap years or seconds? If we were to make our own timestamp
format, would we have to worry about that? Or is the timestamp supplied to
Beam a property of the underlying system giving Beam the timestamp? If it
is, then there may be some interop problems between sources.

On Wed, Nov 13, 2019 at 10:35 AM Luke Cwik  wrote:

> I do agree that Apache Beam can represent dates and times with arbitrary
> precision and can do it many different ways.
>
> My argument has always been should around whether we restrict this range
> to a common standard to increase interoperability across other systems. For
> example, SQL database servers have varying degrees as to what ranges they
> support:
> * Oracle 10[1]: 0001-01-01 to -12-31
> * Oracle 11g[2]: Julian era, ranging from January 1, 4712 BCE through
> December 31,  CE (Common Era, or 'AD'). Unless BCE ('BC' in the format
> mask)
> * MySQL[3]: '1000-01-01 00:00:00' to '-12-31 23:59:59'
> * Microsoft SQL:  January 1, 1753, through December 31,  for
> datetime[4] and January 1,1 CE through December 31,  CE for datetime2[5]
>
> The common case of the global window containing timestamps that are before
> and after all of these supported ranges above means that our users can't
> represent a global window within a database using its common data types.
>
> 1: https://docs.oracle.com/javadb/10.8.3.0/ref/rrefdttlimits.html
> 2:
> https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT413
> 3: https://dev.mysql.com/doc/refman/8.0/en/datetime.html
> 4:
> https://docs.microsoft.com/en-us/sql/t-sql/data-types/datetime-transact-sql?view=sql-server-ver15
> 5:
> https://docs.microsoft.com/en-us/sql/t-sql/data-types/datetime2-transact-sql?view=sql-server-ver15
>
> On Wed, Nov 13, 2019 at 3:28 AM Jan Lukavský  wrote:
>
>> Hi,
>>
>> just an idea on these related topics that appear these days - it might
>> help to realize, that what we actually don't need a full arithmetic on
>> timestamps (Beam model IMHO doesn't need to know exactly what is the exact
>> difference of two events). What we actually need is a slightly simplified
>> algebra. Given two timestamps T1 and T2 and a "duration" (a different type
>> from timestamp), we need operations (not 100% sure that this is exhaustive,
>> but seems to be):
>>
>>  - is_preceding(T1, T2): bool
>>
>>- important !is_preceding(T1, T2) does NOT imply that is_preceding(T2,
>> T1) - !is_preceding(T1, T2) && !is_preceding(T2, T1) would mean events are
>> _concurrent_
>>
>>- this relation has to be also antisymmetric
>>
>>- given this function we can construct a comparator, where multiple
>> distinct timestamps can be "equal" (or with no particular ordering, which
>> is natural property of time)
>>
>>  - min_timestamp_following(T1, duration): T2
>>
>>- that would return a timestamp for which is_preceding(T1 + duration,
>> T2) would return true and no other timestamp X would exist for which
>> is_preceding(T1 + duration, X) && is_preceding(X, T2) would be true
>>
>>- actually, this function would serve as the definition for the
>> duration object
>>
>> If we can supply this algebra, it seems that we can use any
>> representation of timestamps and intervals. It might be (probably) even
>> possible to let user specify his own type used as timestamps and durations,
>> which could solve the issues of not currently being able to correctly
>> represent timestamps lower than Long.MIN_VALUE (although we can get data
>> for that low timestamps - cosmic microwave background being one example
>> :)). Specifying this algebra actually probably boils down to proposal (3)
>> in Robert's thread [1].
>>
>> Just my 2 cents.
>>
>> Jan
>>
>> [1]
>> https://lists.apache.org/thread.html/1672898393cb0d54a77a879be0fb5725902289a3e5063d0f9ec36fe1@%3Cdev.beam.apache.org%3E
>> On 11/13/19 10:11 AM, jincheng sun wrote:
>>
>> Thanks for bringing up this discussion @Luke.
>>
>> As @Kenn mentioned, in Beam we have defined the constants value for the
>> min/max/end of global window. I noticed that
>> google.protobuf.Timestamp/Duration is only used in window definitions,
>> such as FixedWindowsPayload, SlidingWindowsPayload, SessionsPayload, etc.
>>
>> I think that both RFC 3339 and Beam's current implementation are big
>> enough to express a common window definitions. But users can really
>> define a window size that outside the scope of the RFC 3339. Conceptually,
>> we should not limit the time range for window(although I think the range of
>> RPC 3339 is big enough in 

Re: Confusing multiple output semantics in Python

2019-11-11 Thread Sam Rohde
I made https://github.com/apache/beam/pull/9954 that explores this.

Thanks for the insight, Ning. Internally, we use a
different representation.

On Thu, Nov 7, 2019 at 2:27 PM Ning Kang  wrote:

> Hi Sam,
>
> Thanks for clarifying the accessor to output when building a pipeline.
>
> Internally, we have AppliedPTransform, where the output is always a
> dictionary:
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/pipeline.py#L770
> And it seems to me that with key 'None', the output will be the main
> output.
>
> Ning.
>
> On Thu, Nov 7, 2019 at 1:39 PM Sam Rohde  wrote:
>
>> Hi All,
>>
>> In the Python SDK there are three ways of representing the output of a
>> PTransform with multiple PCollections:
>>
>>- dictionary: PCollection tag --> PCollection
>>- tuple: index --> PCollection
>>- DoOutputsTuple: tag, index, or field name --> PCollection
>>
>> I find this inconsistent way of accessing multiple outputs to be
>> confusing. Say that you have an arbitrary PTransform with multiple outputs.
>> How do you know how to access an individual output without looking at the
>> source code? *You can't!* Remember there are three representations of
>> multiple outputs. So, you need to look at the output type and determine
>> what the output actually is.
>>
>> What purpose does it serve to have three different ways of representing a
>> single concept of multiple output PCollections?
>>
>> My proposal is to have a single representation analogous to Java's
>> PCollectionTuple. With this new type you will able to access PCollections
>> by tag with the "[ ]" operator or by field name. It should also up-convert
>> returned tuples, dicts, and DoOutputsTuples from composites into this new
>> type.
>>
>> Full example:
>>
>> class SomeCustomComposite(PTransform):
>>   def expand(self, pcoll):
>> def my_multi_do_fn(x):
>>   if isinstance(x, int):
>> yield pvalue.TaggedOutput('number', x)
>>   if isinstance(x, str):
>> yield pvalue.TaggedOutput('string', x)
>>
>> def printer(x):
>>   print(x)
>>   yield x
>>
>> outputs = pcoll | beam.ParDo(my_multi_do_fn).with_outputs()*return 
>> pvalue.PTuple({
>> 'number': output.number | beam.ParDo(printer),
>> 'string': output.string | beam.ParDo(printer)
>> })*
>>
>> p = beam.Pipeline()
>> *main = p | SomeCustomComposite()*
>>
>> # Access PCollection by field name.
>> numbers = *main.number* | beam.ParDo(...)
>>
>> # Access PCollection by tag.
>> strings = *main['string']* | beam.ParDo(...)
>>
>> What do you think? Does this clear up the confusion of using multiple
>> output PCollections in Python?
>>
>> Regards,
>> Sam
>>
>


Confusing multiple output semantics in Python

2019-11-07 Thread Sam Rohde
Hi All,

In the Python SDK there are three ways of representing the output of a
PTransform with multiple PCollections:

   - dictionary: PCollection tag --> PCollection
   - tuple: index --> PCollection
   - DoOutputsTuple: tag, index, or field name --> PCollection

I find this inconsistent way of accessing multiple outputs to be confusing.
Say that you have an arbitrary PTransform with multiple outputs. How do you
know how to access an individual output without looking at the source
code? *You
can't!* Remember there are three representations of multiple outputs. So,
you need to look at the output type and determine what the output actually
is.

What purpose does it serve to have three different ways of representing a
single concept of multiple output PCollections?

My proposal is to have a single representation analogous to Java's
PCollectionTuple. With this new type you will able to access PCollections
by tag with the "[ ]" operator or by field name. It should also up-convert
returned tuples, dicts, and DoOutputsTuples from composites into this new
type.

Full example:

class SomeCustomComposite(PTransform):
  def expand(self, pcoll):
def my_multi_do_fn(x):
  if isinstance(x, int):
yield pvalue.TaggedOutput('number', x)
  if isinstance(x, str):
yield pvalue.TaggedOutput('string', x)

def printer(x):
  print(x)
  yield x

outputs = pcoll | beam.ParDo(my_multi_do_fn).with_outputs()*
return pvalue.PTuple({
'number': output.number | beam.ParDo(printer),
'string': output.string | beam.ParDo(printer)
})*

p = beam.Pipeline()
*main = p | SomeCustomComposite()*

# Access PCollection by field name.
numbers = *main.number* | beam.ParDo(...)

# Access PCollection by tag.
strings = *main['string']* | beam.ParDo(...)

What do you think? Does this clear up the confusion of using multiple
output PCollections in Python?

Regards,
Sam


Re: Multiple Outputs from Expand in Python

2019-10-25 Thread Sam Rohde
Talked to Daniel offline and it looks like the Python SDK is missing
PCollection Tuples like the one Java has:
https://github.com/rohdesamuel/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
.

I'll go ahead and implement that for the Python SDK.

On Thu, Oct 24, 2019 at 5:20 PM Sam Rohde  wrote:

> Hey All,
>
> I'm trying to implement an expand override with multiple output
> PCollections. The kicker is that I want to insert a new transform for each
> output PCollection. How can I do this?
>
> Regards,
> Sam
>


Multiple Outputs from Expand in Python

2019-10-24 Thread Sam Rohde
Hey All,

I'm trying to implement an expand override with multiple output
PCollections. The kicker is that I want to insert a new transform for each
output PCollection. How can I do this?

Regards,
Sam


Re: [design] A streaming Fn API runner for Python

2019-10-15 Thread Sam Rohde
Thanks for picking this up again Pablo, I wrote some small comments
concerning the TestStream.

On Tue, Oct 15, 2019 at 4:42 PM Robert Bradshaw  wrote:

> Very excited to see this! I've added some comments to the doc.
>
> On Tue, Oct 15, 2019 at 3:43 PM Pablo Estrada  wrote:
>
>> I've just been informed that access wasn't open. I've since opened access
>> to it.
>> Thanks
>> -P.
>>
>> On Tue, Oct 15, 2019 at 2:10 PM Pablo Estrada  wrote:
>>
>>> Hello all,
>>> I am planning to work on removing the old BundleBasedDirectRunner, and
>>> expand the FnApiRunner to work on streaming as well as batch.
>>> Currently, the FnApiRunner orders the processing graph topologically,
>>> and "pushes" all the data through each stage in topological order (deferred
>>> inputs such as residuals and timers are immediately pushed to the SDK as
>>> well).
>>> The new design would change from this
>>> push-all-data-through-topologically-sorted-stages model to having queues
>>> for "bundles", or for elements that are awaiting processing, and routing
>>> them to the appropriate bundle processing subgraph.
>>>
>>> The design is here: http://s.apache.org/streaming-fn-runner-py
>>>
>>> I expect
>>>
>>> I'd appreciate comments and everything : )
>>> Best
>>> -P.
>>>
>>


Re: New JIRA Component Request

2019-10-01 Thread Sam Rohde
Thanks for the quick response Pablo!

On Tue, Oct 1, 2019 at 3:21 PM Pablo Estrada  wrote:

> I've created a runner-py-interactive component:
> https://jira.apache.org/jira/issues/?jql=project+%3D+BEAM+AND+component+%3D+runner-py-interactive
>
> Hope that helps!
> -P.
>
> On Tue, Oct 1, 2019 at 3:16 PM Ning Kang  wrote:
>
>> +1
>> FYI, I'm temporarily using examples-python component.
>>
>> On Tue, Oct 1, 2019 at 3:04 PM Sam Rohde  wrote:
>>
>>> Hi All,
>>>
>>> I am working improvements to the InteractiveRunner along side with +David
>>> Yan , +Ning Kang , and Alexey
>>> Strokach. I am requesting on behalf of this working group to add a new Jira
>>> component "runner-interactive" as the current list of components is
>>> insufficient.
>>>
>>> Regards,
>>> Sam
>>>
>>


New JIRA Component Request

2019-10-01 Thread Sam Rohde
Hi All,

I am working improvements to the InteractiveRunner along side with +David
Yan , +Ning Kang , and Alexey
Strokach. I am requesting on behalf of this working group to add a new Jira
component "runner-interactive" as the current list of components is
insufficient.

Regards,
Sam


Re: Add exception handling to MapElements

2019-02-11 Thread Sam Rohde
Sure, I was thinking of treating the apply as a promise (making use of your
CodableException idea as well):

```
PCollection<...> result = words.apply(new SomeUserDoFn())

.then(new SomeOtherDoFn())

.then(new OtherDoFn(),

 // Error Handler

  (CodableException<...> e) -> {

logger.info(e.getMessage());

return e;

 });

```

The idea is to treat the pipeline with each apply as an asynchronous
operation where each step can either be "fulfilled" or "rejected". The
promises can then be chained together like above.


On Mon, Feb 11, 2019 at 1:47 PM Jeff Klukas  wrote:

> Vallery Lancey's post is definitely one of the viewpoints incorporated
> into this approach. I neglected to include that link in this iteration, but
> it was discussed in the previous thread.
>
> Can you explain more about "another option that adds A+ Promise spec into
> the apply method"? I'm failing to parse what that means.
>
> On Mon, Feb 11, 2019 at 4:23 PM Sam Rohde  wrote:
>
>> Interesting ideas! I think you're really honing in on what the Apache
>> Beam API is missing: error handling for bad data and runtime errors. I like
>> your method because it coalesces all the errors into a single collection to
>> be looked at later. Also easy to add a PAssert on the errors collection.
>>
>> Looks like others are also taking a stab at exception handling:
>> https://medium.com/@vallerylancey/error-handling-elements-in-apache-beam-pipelines-fffdea91af2a
>>
>> I would also like to add another option that adds A+ Promise spec into
>> the apply method. This makes exception handling more general than with only
>> the Map method.
>>
>> On Fri, Feb 8, 2019 at 9:53 AM Jeff Klukas  wrote:
>>
>>> I'm looking for feedback on a new attempt at implementing an exception
>>> handling interface for map transforms as previously discussed on this list
>>> [0] and documented in JIRA [1]. I'd like for users to be able to pass a
>>> function into MapElements, FlatMapElements, etc. that potentially raises an
>>> exception without having to resort to rolling a completely custom ParDo
>>> with an additional output for failing elements.
>>>
>>> I have a PR open for review [2] that introduces an exception-handling
>>> interface that mimics the existing `into` and `via` methods of MapElements:
>>>
>>>  Result, String> result = words.apply(
>>>  MapElements.into(TypeDescriptors.integers())
>>> .via((String word) -> 1 / word.length()) // throws
>>> ArithmeticException
>>> .withExceptions() // returns a MapWithFailures transform
>>> .into(TypeDescriptors.strings())
>>> .via(ee -> e.exception().getMessage()));
>>>  PCollection errors = result.errors();
>>>  PCollection output = result.output();
>>>
>>>
>>>
>>> The example above is a bit more complex than I'd like, but gives users
>>> full control over what type handled exceptions are transformed into. It
>>> would be nice if we could simply create an error collection of some type
>>> that wraps the input element and the Exception directly, but there is still
>>> no general solution for encoding subclasses of exception, thus the need for
>>> some exception handling function (which in this example is the lambda
>>> passed to the second `via`).
>>>
>>> Let's call the above option 1.
>>>
>>> If we expect that most users will simply want to preserve the input
>>> element that failed and know general metadata about the exception
>>> (className, message, and stackTrace), we could instead optimize for a
>>> default solution where we return an instance of a new
>>> CodableException[InputT] type that wraps the input element and has
>>> additional string fields for className, message, and stackTrace:
>>>
>>>  Result, String> result = words.apply(
>>>  MapElements.into(TypeDescriptors.integers())
>>> .via((String word) -> 1 / word.length())
>>> .withExceptions());
>>>  PCollection> errors = result.errors();
>>>  PCollection output = result.output();
>>>
>>> Let's call this option 2.
>>>
>>> It's less user code compared to option 1 and returns a richer error
>>> collection. I believe we'd be able to handle setting an appropriate coder
>>> behind the scenes, setting some composite coder that reuses the coder for
>>> the input collection in order to encode the InputT instance.
>>>
>&

Re: Correct way to implement ProcessBundleProgressResponse in the Java SDK

2019-02-11 Thread Sam Rohde
Yeah, take a look at the ProcessRemoteBundleOperation.java

class. This is the class that is in charge of handling bundle execution.
You can create a new implementation of the BundleProgressHandler.java
,
and give that to the ProcessRemoteBundleOperation in the
BeamFnMapTaskExecutorFactory.java
.
Currently the progress handler is "ignored", so you will be the first
implementation using ExecutableStages.

On Tue, Feb 5, 2019 at 11:09 AM Alex Amato  wrote:

> I filed a JIRA to introduce ProcesSBundleProgressResponses, which are not
> yet implemented in the java SDK. I just wanted to make sure what I put down
> here is the correct approach?
>
> I wrote down a few places in the code where I thought it might be needed in
> https://jira.apache.org/jira/browse/BEAM-6597
>
>  I recall Luke once suggesting that this could be done by overrding some
> progress method that already existed. In ProcessBundleHandler or some layer
> which invokes it? But I could be making this up.
>
> I was wondering if anyone else had a good sense for this.
>


Re: [PROPOSAL] Prepare Beam 2.11.0 release

2019-02-11 Thread Sam Rohde
Thanks Ahmet! The 2.11.0 release will also be using the revised release
process from PR-7529  that I
authored. Let me know if you have any questions or if I can help in any
way. I would love feedback on how to improve on the modifications I made
and the release process in general.

On Fri, Feb 8, 2019 at 9:20 AM Scott Wegner  wrote:

> +1, thanks Ahmet! Let us know if you need any help.
>
>
> On Fri, Feb 8, 2019 at 8:09 AM Alan Myrvold  wrote:
>
>> Thanks for volunteering. Glad to see the schedule being kept.
>>
>> On Fri, Feb 8, 2019 at 5:58 AM Maximilian Michels  wrote:
>>
>>> +1 We should keep up the release cycle even though we are a bit late
>>> with 2.10.0
>>>
>>> On 08.02.19 02:10, Andrew Pilloud wrote:
>>> > +1 let's keep things going. Thanks for volunteering!
>>> >
>>> > Andrew
>>> >
>>> > On Thu, Feb 7, 2019, 4:49 PM Kenneth Knowles >> > > wrote:
>>> >
>>> > I think this is a good idea. Even though 2.10.0 is still making
>>> it's
>>> > way out, there's still 6 weeks of changes between the cut dates,
>>> > lots of value. It is good to keep the rhythm and follow the
>>> calendar.
>>> >
>>> > Kenn
>>> >
>>> > On Thu, Feb 7, 2019, 15:52 Ahmet Altay >> >  wrote:
>>> >
>>> > Hi all,
>>> >
>>> > Beam 2.11 release branch cut date is 2/13 according to the
>>> > release calendar [1]. I would like to volunteer myself to do
>>> > this release. I intend to cut the branch on the planned 2/13
>>> date.
>>> >
>>> > If you have releasing blocking issues for 2.11 please mark
>>> their
>>> > "Fix Version" as 2.11.0. (I also created a 2.12.0 release in
>>> > JIRA in case you would like to move any non-blocking issues to
>>> > that version.)
>>> >
>>> > What do you think?
>>> >
>>> > Ahmet
>>> >
>>> > [1]
>>> >
>>> https://calendar.google.com/calendar/embed?src=0p73sl034k80oob7seouanigd0%40group.calendar.google.com=America%2FLos_Angeles
>>> >
>>>
>>
>
> --
>
>
>
>
> Got feedback? tinyurl.com/swegner-feedback
>


Re: Add exception handling to MapElements

2019-02-11 Thread Sam Rohde
Interesting ideas! I think you're really honing in on what the Apache Beam
API is missing: error handling for bad data and runtime errors. I like your
method because it coalesces all the errors into a single collection to be
looked at later. Also easy to add a PAssert on the errors collection.

Looks like others are also taking a stab at exception handling:
https://medium.com/@vallerylancey/error-handling-elements-in-apache-beam-pipelines-fffdea91af2a

I would also like to add another option that adds A+ Promise spec into the
apply method. This makes exception handling more general than with only the
Map method.

On Fri, Feb 8, 2019 at 9:53 AM Jeff Klukas  wrote:

> I'm looking for feedback on a new attempt at implementing an exception
> handling interface for map transforms as previously discussed on this list
> [0] and documented in JIRA [1]. I'd like for users to be able to pass a
> function into MapElements, FlatMapElements, etc. that potentially raises an
> exception without having to resort to rolling a completely custom ParDo
> with an additional output for failing elements.
>
> I have a PR open for review [2] that introduces an exception-handling
> interface that mimics the existing `into` and `via` methods of MapElements:
>
>  Result, String> result = words.apply(
>  MapElements.into(TypeDescriptors.integers())
> .via((String word) -> 1 / word.length()) // throws
> ArithmeticException
> .withExceptions() // returns a MapWithFailures transform
> .into(TypeDescriptors.strings())
> .via(ee -> e.exception().getMessage()));
>  PCollection errors = result.errors();
>  PCollection output = result.output();
>
>
>
> The example above is a bit more complex than I'd like, but gives users
> full control over what type handled exceptions are transformed into. It
> would be nice if we could simply create an error collection of some type
> that wraps the input element and the Exception directly, but there is still
> no general solution for encoding subclasses of exception, thus the need for
> some exception handling function (which in this example is the lambda
> passed to the second `via`).
>
> Let's call the above option 1.
>
> If we expect that most users will simply want to preserve the input
> element that failed and know general metadata about the exception
> (className, message, and stackTrace), we could instead optimize for a
> default solution where we return an instance of a new
> CodableException[InputT] type that wraps the input element and has
> additional string fields for className, message, and stackTrace:
>
>  Result, String> result = words.apply(
>  MapElements.into(TypeDescriptors.integers())
> .via((String word) -> 1 / word.length())
> .withExceptions());
>  PCollection> errors = result.errors();
>  PCollection output = result.output();
>
> Let's call this option 2.
>
> It's less user code compared to option 1 and returns a richer error
> collection. I believe we'd be able to handle setting an appropriate coder
> behind the scenes, setting some composite coder that reuses the coder for
> the input collection in order to encode the InputT instance.
>
> I think we'd still need to provide some additional methods, though, if the
> user wants to set a custom exception handling function and custom coder.
> That would be for needs where a user wants to catch only a particular
> subclass of exception, or access additional methods of Exception (to access
> getCause() perhaps) or methods particular to an Exception subclass. The
> full API would end up being more complex compared to option 1, but it does
> make the default case much easier to use.
>
> If it's not fairly obvious what's going on in either of the above
> examples, then we likely haven't figured out an appropriate API yet.
> Reviews on the PR or commentary on the above two options would be
> appreciated.
>
> [0]
> https://lists.apache.org/thread.html/936ed2a5f2c01be066fd903abf70130625e0b8cf4028c11b89b8b23f@%3Cdev.beam.apache.org%3E
> 
> [1] https://issues.apache.org/jira/browse/BEAM-5638
> [2] https://github.com/apache/beam/pull/7736
>


Re: Thoughts on a reference runner to invest in?

2019-02-11 Thread Sam Rohde
Thanks for starting this thread. If I had to guess, I would say there is
more of a demand for Python as it's more widely used for data scientists/
analytics. Being pragmatic, the FnApiRunner already has more feature work
than the Java so we should go with that.

-Sam

On Fri, Feb 8, 2019 at 10:07 AM Daniel Oliveira 
wrote:

> Hello Beam dev community,
>
> For those who don't know me, I work for Google and I've been working on
> the Java reference runner, which is a portable, local Java runner (it's
> basically the direct runner with the portability APIs implemented). Our
> goal in working on this was to have a portable runner which ran locally so
> it could be used by users for testing portable pipelines, devs for testing
> new features with portability, and for runner authors to provide a simple
> reference implementation of a portable runner.
>
> Due to various circumstances though, progress on the Java reference runner
> has been pretty slow, and a Python runner which does pretty much the same
> things was made to aid portability development in Python (called the
> FnApiRunner). This runner is currently further along in feature work than
> the Java reference runner, so we've been reevaluating if we should switch
> to investing in it instead.
>
> My question to the community is: Which runner do you think would be more
> valuable to the dev community and Beam users? For those of you who are
> runner authors, do you have a preference for what language you'd like to
> see a reference implementation in?
>
> Thanks,
> Daniel Oliveira
>


Re: Magic number explanation in ParDoTest.java

2019-01-22 Thread Sam Rohde
Thanks for digging up the PR. I'm still confused as to why that magic
number is still there though. Why is there an expectation that the
timestamp from the timer is *exactly *1774ms past
BoundedWindow.TIMESTAMP_MIN_VALUE?

On Tue, Jan 22, 2019 at 10:43 AM Kenneth Knowles  wrote:

> The commit comes from this PR: https://github.com/apache/beam/pull/2273
>
> Kenn
>
> On Tue, Jan 22, 2019 at 10:21 AM Sam Rohde  wrote:
>
>> Hi all,
>>
>> Does anyone have context why there is a magic number of "1774"
>> milliseconds in the ParDoTest.java on line 2618? This is in
>> the testEventTimeTimerAlignBounded method.
>>
>> File at master:
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java#L2618
>>
>> First added commit:
>> https://github.com/apache/beam/commit/4f934923d28798dfe7cd18c86ff4bcf8eebc27e5
>>
>> Regards,
>> Sam
>>
>


Magic number explanation in ParDoTest.java

2019-01-22 Thread Sam Rohde
Hi all,

Does anyone have context why there is a magic number of "1774" milliseconds
in the ParDoTest.java on line 2618? This is in
the testEventTimeTimerAlignBounded method.

File at master:
https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java#L2618

First added commit:
https://github.com/apache/beam/commit/4f934923d28798dfe7cd18c86ff4bcf8eebc27e5

Regards,
Sam


Re: Beam JobService Problem

2019-01-15 Thread Sam Rohde
On Tue, Jan 15, 2019 at 5:23 AM Robert Bradshaw  wrote:

> On Tue, Jan 15, 2019 at 1:19 AM Ankur Goenka  wrote:
> >
> > Thanks Sam for bringing this to the list.
> >
> > As preparation_ids are not reusable, having preparation_id and job_id
> same makes sense to me for Flink.
>
> I think we change the protocol and only have one kind of ID. As well
> as solving the problem at hand, it also simplifies the API.
>
That sounds fantastic.

On Tue, Jan 15, 2019 at 5:23 AM Robert Bradshaw  wrote:

> On Tue, Jan 15, 2019 at 1:19 AM Ankur Goenka  wrote:

> Another option is to have a subscription for all states/messages on the
> JobServer.
> The problem is forcing the job service to remember all logs that were
> ever logged ever in case someone requests them at some future date.
> Best to have a way to register a listener earlier.

I agree with Robert that it should be the caller in charge of what to do
with generated monitoring data. This is especially true with long-running
jobs that generate potentially gigabytes worth of logs.

I made https://issues.apache.org/jira/browse/BEAM-6442 to track this. Let
me know if I missed anything.

On Tue, Jan 15, 2019 at 5:23 AM Robert Bradshaw  wrote:

> On Tue, Jan 15, 2019 at 1:19 AM Ankur Goenka  wrote:
> >
> > Thanks Sam for bringing this to the list.
> >
> > As preparation_ids are not reusable, having preparation_id and job_id
> same makes sense to me for Flink.
>
> I think we change the protocol and only have one kind of ID. As well
> as solving the problem at hand, it also simplifies the API.
>
> > Another option is to have a subscription for all states/messages on the
> JobServer.
>
> The problem is forcing the job service to remember all logs that were
> ever logged ever in case someone requests them at some future date.
> Best to have a way to register a listener earlier.
>
> > This will be similar to "docker". As the container id is created after
> the container creation, the only way to get the container creation even is
> to start "docker events" before starting a container.
> >
> > On Mon, Jan 14, 2019 at 11:13 AM Maximilian Michels 
> wrote:
> >>
> >> Hi Sam,
> >>
> >> Good observation. Looks like we should fix that.
> >>
> >> Looking at InMemoryJobService, it appears that the state can only be
> retrieved
> >> by the client once the job is running with a job/invocation id
> associated.
> >> Indeed, any messages until that could be lost.
> >>
> >> For Flink the JobId is generated here:
> >>
> https://github.com/apache/beam/blob/3db71dd9f6f32684903c54b15a5368991cd41f36/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java#L64
> >>
> >> I don't see any benefit of having two separate IDs, as the IDs are
> already
> >> scoped by preparation and invocation phase.
> >>
> >> - Would it be possible to just pass the preparation id as the
> invocation id at
> >> JobInvoker#invoke(..)?
> >>
> >> - Alternatively, we could have an additional prepare phase for
> JobInvoker to get
> >> the job id for the invocation, before we start the job.
> >>
> >> Thanks,
> >> Max
> >>
> >> On 14.01.19 12:39, Sam Rohde wrote:
> >> > Hello all,
> >> >
> >> > While going through the codebase I noticed a problem with the Beam
> JobService.
> >> > In particular, the API allows for the possibility of never seeing
> some messages
> >> > or states with Get(State|Message)Stream. This is because the
> >> > Get(State|Message)Stream calls need to have the job id which can only
> be
> >> > obtained from the RunJobResponse. But in order to see all
> messages/states the
> >> > streams need to be opened before the job starts.
> >> >
> >> > This is fine in Dataflow as the preparation_id == job_id, but this is
> not true
> >> > in Flink. What do you all think of this? Am I misunderstanding
> something?
> >> >
> >> > Thanks,
> >> > Sam
> >> >
>


Re: Add all tests to release validation

2019-01-15 Thread Sam Rohde
+Boyuan Zhang  who is modifying the rc validation script

I'm thinking of a small change to the proposed process brought to my
attention from Boyuan.

Instead of running the additional validation tests during the rc
validation, run the tests and the proposed process after the release branch
has been cut. A couple of reasons why:

   - The additional validation tests (PostCommit and PreCommit) don't run
   against the RC and are instead run against the branch. This is confusing
   considering the other tests in the RC validation step are per RC.
   - The additional validation tests are expensive.

The final release process would look like:

   - Decide to release
   - Create a new version in JIRA
   - Triage release-blocking issue in JIRAs
   - Review release notes in JIRA
   - Create a release branch
   - Verify that a release builds
   - >>> Verify that a release passes its tests <<< (this is where the new
   process would be added)
   - Build/test/fix RCs
   - >>> Fix any issues <<< (all JIRAs created during the new process will
   have to be closed by here)
   - Finalize the release
   - Promote the release




On Thu, Jan 10, 2019 at 4:32 PM Kenneth Knowles  wrote:

> What do you think about crowd-sourcing?
>
> 1. Fix Version = 2.10.0
> 2. If assigned, ping ticket and maybe assignee, unassign if unresponsive
> 3. If unassigned, assign it to yourself while thinking about it
> 4. If you can route it a bit closer to someone who might know, great
> 5. If it doesn't look like a blocker (after routing best you can), Fix
> Version = 2.11.0
>
> I think this has enough mutexes that there should be no duplicated work if
> it is followed. And every step is a standard use of Fix Version and
> Assignee so there's not really special policy needed.
>
> Kenn
>
> On Thu, Jan 10, 2019 at 4:25 PM Mikhail Gryzykhin 
> wrote:
>
>> +1
>>
>> Although we should be cautious when enabling this policy. We have decent
>> backlog of bugs that we need to plumb through.
>>
>> --Mikhail
>>
>> Have feedback <http://go/migryz-feedback>?
>>
>>
>> On Thu, Jan 10, 2019 at 11:44 AM Scott Wegner  wrote:
>>
>>> +1, this sounds good to me.
>>>
>>> I believe the next step would be to open a PR to add this to the release
>>> guide:
>>> https://github.com/apache/beam/blob/master/website/src/contribute/release-guide.md
>>>
>>> On Wed, Jan 9, 2019 at 12:04 PM Sam Rohde  wrote:
>>>
>>>> Cool, thanks for all of the replies. Does this summary sound reasonable?
>>>>
>>>> *Problem:* there are a number of failing tests (including flaky) that
>>>> don't get looked at, and aren't necessarily green upon cutting a new Beam
>>>> release.
>>>>
>>>> *Proposed Solution:*
>>>>
>>>>- Add all tests to the release validation
>>>>- For all failing tests (including flaky) create a JIRA attached to
>>>>the Beam release and add to the "test-failures" component*
>>>>- If a test is continuously failing
>>>>  - fix it
>>>>  - add fix to release
>>>>  - close out JIRA
>>>>   - If a test is flaky
>>>>  - try and fix it
>>>>  - If fixed
>>>> - add fix to release
>>>> - close out JIRA
>>>>  - else
>>>> - manually test it
>>>> - modify "Fix Version" to next release
>>>>  - The release validation can continue when all JIRAs are
>>>>closed out.
>>>>
>>>> *Why this is an improvement:*
>>>>
>>>>- Ensures that every test is a valid signal (as opposed to
>>>>disabling failing tests)
>>>>- Creates an incentive to automate tests (no longer on the hook to
>>>>manually test)
>>>>- Creates a forcing-function to fix flaky tests (once fixed, no
>>>>longer needs to be manually tested)
>>>>- Ensures that every failing test gets looked at
>>>>
>>>> *Why this may not be an improvement:*
>>>>
>>>>- More effort for release validation
>>>>- May slow down release velocity
>>>>
>>>> * for brevity, this might be better to create a JIRA per component
>>>> containing a summary of failing tests
>>>>
>>>>
>>>> -Sam
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Jan 8, 2019 at 10:25 AM Ahmet Altay  w

Beam JobService Problem

2019-01-14 Thread Sam Rohde
Hello all,

While going through the codebase I noticed a problem with the Beam
JobService. In particular, the API allows for the possibility of never
seeing some messages or states with Get(State|Message)Stream. This is
because the  Get(State|Message)Stream calls need to have the job id which
can only be obtained from the RunJobResponse. But in order to see all
messages/states the streams need to be opened before the job starts.

This is fine in Dataflow as the preparation_id == job_id, but this is not
true in Flink. What do you all think of this? Am I misunderstanding
something?

Thanks,
Sam


Re: Add all tests to release validation

2019-01-09 Thread Sam Rohde
s fixed or
>>>>> otherwise not reproducible.
>>>>>
>>>>> For automation, I wonder if there's something automatic already
>>>>> available somewhere that would:
>>>>>
>>>>>  - mark the Jenkins build to "Keep This Build Forever"
>>>>>  - be *very* careful to try to find an existing bug, else it will be
>>>>> spam
>>>>>  - file bugs to "test-failures" component
>>>>>  - set Fix Version to the "next" - right now we have 2.7.1 (LTS),
>>>>> 2.11.0 (next mainline), 3.0.0 (dreamy incompatible ideas) so need the
>>>>> smarts to choose 2.11.0
>>>>>
>>>>> If not, I think doing this stuff manually is not that bad, assuming we
>>>>> can stay fairly green.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Mon, Jan 7, 2019 at 3:20 PM Sam Rohde  wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> There are a number of tests in our system that are either flaky or
>>>>>> permanently red. I am suggesting to add, if not all, then most of the 
>>>>>> tests
>>>>>> (style, unit, integration, etc) to the release validation step. In this
>>>>>> way, we will add a regular cadence to ensuring greenness and no flaky 
>>>>>> tests
>>>>>> in Beam.
>>>>>>
>>>>>> There are a number of ways of implementing this, but what I think
>>>>>> might work the best is to set up a process that either manually or
>>>>>> automatically creates a JIRA for the failing test and assigns it to a
>>>>>> component tagged with the release number. The release can then continue
>>>>>> when all JIRAs are closed by either fixing the failure or manually 
>>>>>> testing
>>>>>> to ensure no adverse side effects (this is in case there are 
>>>>>> environmental
>>>>>> issues in the testing infrastructure or otherwise).
>>>>>>
>>>>>> Thanks for reading, what do you think?
>>>>>> - Is there another, easier way to ensure that no test failures go
>>>>>> unfixed?
>>>>>> - Can the process be automated?
>>>>>> - What am I missing?
>>>>>>
>>>>>> Regards,
>>>>>> Sam
>>>>>>
>>>>>>
>>>
>>> --
>>>
>>>
>>>
>>>
>>> Got feedback? tinyurl.com/swegner-feedback
>>>
>>


Add all tests to release validation

2019-01-07 Thread Sam Rohde
Hi All,

There are a number of tests in our system that are either flaky or
permanently red. I am suggesting to add, if not all, then most of the tests
(style, unit, integration, etc) to the release validation step. In this
way, we will add a regular cadence to ensuring greenness and no flaky tests
in Beam.

There are a number of ways of implementing this, but what I think might
work the best is to set up a process that either manually or automatically
creates a JIRA for the failing test and assigns it to a component tagged
with the release number. The release can then continue when all JIRAs are
closed by either fixing the failure or manually testing to ensure no
adverse side effects (this is in case there are environmental issues in the
testing infrastructure or otherwise).

Thanks for reading, what do you think?
- Is there another, easier way to ensure that no test failures go unfixed?
- Can the process be automated?
- What am I missing?

Regards,
Sam


Re: Is portable.DirectGroupByKey supposed to be private?

2018-10-17 Thread Sam Rohde
Example:
org.apache.beam.runners.direct.portable.DirectGroupByKey is not declared
public (so it's private by default). Meaning that it ca't be used in
the org.apache.beam.runners.direct package.

On Wed, Oct 17, 2018 at 11:25 AM Sam Rohde  wrote:

> Hi I'm working on deduplicating code from when the portable worker code
> was donated to the project. I found that the portable DirectGroupByKey is
> private and its inner classes. Is this by design? If so, why? If not, then
> I'm going to change it to be public.
>
> Regards,
> Sam
>


Is portable.DirectGroupByKey supposed to be private?

2018-10-17 Thread Sam Rohde
Hi I'm working on deduplicating code from when the portable worker code was
donated to the project. I found that the portable DirectGroupByKey is
private and its inner classes. Is this by design? If so, why? If not, then
I'm going to change it to be public.

Regards,
Sam


Re: A new contributor

2018-10-04 Thread Sam Rohde
Done

On Thu, Oct 4, 2018 at 4:43 PM Mikhail Gryzykhin  wrote:

> I believe this list will also require:
>
> * Access to: https://cwiki.apache.org/confluence/display/BEAM/ \
> Sam, please create account, then ping here so that committer grants
> you access rights. (I believe this is correct order)
> * Refresh link to join Slack.
> I've invited Sam directly. Since we always need to refresh join slack
> link, I guess it is easier to change instruction to just ask committer to
> add new contributors to slack channel.
> * @Kenneth Knowles  Thank you for creating Jira account.
>
> --Mikhail
>
> Have feedback <http://go/migryz-feedback>?
>
>
> On Thu, Oct 4, 2018 at 12:06 PM Kenneth Knowles  wrote:
>
>> I added you to the Contributor role, so you can be assigned JIRAs.
>> (assuming your JIRA username is samrohde)
>>
>> Kenn
>>
>> On Thu, Oct 4, 2018 at 11:31 AM Sam Rohde  wrote:
>>
>>> Hi all!
>>>
>>> My name is Sam and I work for Google Cloud Dataflow. I'm going to be
>>> starting work on Apache Beam soon and I wish to be added as a contributor
>>> in the Beam issue tracker for JIRA as well as any other necessary
>>> permissions to start work.
>>>
>>> Thanks,
>>> Sam
>>>
>>


A new contributor

2018-10-04 Thread Sam Rohde
Hi all!

My name is Sam and I work for Google Cloud Dataflow. I'm going to be
starting work on Apache Beam soon and I wish to be added as a contributor
in the Beam issue tracker for JIRA as well as any other necessary
permissions to start work.

Thanks,
Sam