Re: XLang sub-graph representation within the SDKs pipeline types

2020-07-07 Thread Robert Burke
+1 to clear documentation of what the various Payloads are allowed to
reference. Outside of those, the various proto fields are pretty clear by
the proto structure, but the payloads are an escape hatch to anything.

On Tue, Jul 7, 2020 at 2:52 PM Luke Cwik  wrote:

> Kenn, all the runner based PTransform replacement/fusion/... works on the
> proto version of the pipeline. Only at the final stages where we need to
> reify certain objects for execution do we reuse the non-proto based objects
> (well known coders, window fns, ...). The conversion back to Java
> PCollection/Coder/PTransform/... objects and then back to proto again had
> similar issues to what Python is experiencing. Unfortunately, Beam Java is
> not well prepared to be the caller of the XLang expansion service as it
> currently does not remember the components map as part of the pipeline so
> that the ids can be propagated correctly. I think there would be a good
> amount of code that would need to change related to pipeline construction
> to fix this. Also, the alpha conversion would have been a lot simpler if we
> had forced ids to be globally unique otherwise each local -> id mapping
> needs to be scoped to what it is representing.
>
> Cham, I was looking to see what people were thinking in this space and to
> share it with the wider community and I think making sure that an object ->
> id context map is part of the pipeline construction makes a lot of sense as
> a solution to preserve these ids. The other part of this is that we need to
> document what these opaque blobs representing windows fns, do fns, ... are
> allowed to reference since runners and SDKs would have the ability to
> deduplicate common protos. We do have one level of redirection for
> inputs/outputs on PTransforms since it was recognized early on that runners
> will need to have the ability to rewire PTransforms. We also have it on
> user state, timers and side inputs on ParDoPayload since runners may need
> to rewrite the coder associated with them to add length prefixing when
> necessary. This all ties into documenting what can be saved inside these
> opaque blobs and what SDKs and runners can assume about what is legal to do
> to the pipeline proto representation.
>
> On Thu, Jul 2, 2020 at 10:19 AM Robert Bradshaw 
> wrote:
>
>> I just finished reading BEAM-10143 and pr/12067, and I agree this is
>> exactly the issue. The inability to do alpha conversions is exactly why the
>> namespace prefix was introduced for external transforms, but the fact that
>> the caller uses an ephemeral context to create ids for the input
>> PCollections (and their coders, windowing strategies, etc.) when calling
>> external transforms, which is again distinct from that used to convert the
>> whole pipeline, causes issues here.
>>
>> Keeping this mapping around for the lifetime of the pipeline, as is done
>> in pr/12067, seems a reasonable solution. Another possibility is to create
>> these ephemeral contexts with the prescribed namespace in the caller, and
>> leverage the fact that we can alpha convert the PCollections, to avoid
>> collisions for stuff like this. (The coders and windowing strategies would
>> be duplicated in the graph, so that's less than ideal.)
>>
>>
>> On Thu, Jul 2, 2020 at 10:03 AM Kenneth Knowles  wrote:
>>
>>> I had a very long chat with Brian about this the other day over
>>> https://github.com/apache/beam/pull/12067
>>>
>>> The most important piece of background: ** the ids are just pointers [1]
>>> and every other use of them is a misuse of the structure **. Equivalently,
>>> they are bound variables and the binding is in the Components. Using the
>>> name for any purpose other than referring to the component is a misuse [2]
>>> of the structure.
>>>
>>> For anyone that has not done language programming, this is the same as
>>> managing bindings. It is sometimes tricky to get right, but it is also
>>> solved [3]. The term to search for is "alpha conversion" and there are many
>>> resources. I think talking about unique ids muddles the issue. These are
>>> locally unique for exactly the same mundane reason that variables have
>>> different names.
>>>
>>> However, it is more complicated for Beam, because payloads by design can
>>> be opaque, so they cannot be alpha converted. This is true even without
>>> xlang. Once a payload exists within a proto with components, it cannot be
>>> separated from those components since you cannot (in general) know which
>>> are referenced. This design could be revised to allow alpha conversion of
>>> opaque payloads by exposing ids via a local layer of indirection.
>>>
>>> Suppose you have components with "id_a", "id_b", ...
>>> Instead of { urn: "xyz", payload: <>> "id_a", "id_b", ...>> }
>>> We could encode as { urn: "xyz", references: { "local_a": "id_a",
>>> "local_b": "id_b", ... }, payload: <>> bindings "local_a", "local_b", ...>> }
>>>
>>> With this change we can alpha convert opaque payloads by altering the
>>> values in 

Re: XLang sub-graph representation within the SDKs pipeline types

2020-07-07 Thread Luke Cwik
Kenn, all the runner based PTransform replacement/fusion/... works on the
proto version of the pipeline. Only at the final stages where we need to
reify certain objects for execution do we reuse the non-proto based objects
(well known coders, window fns, ...). The conversion back to Java
PCollection/Coder/PTransform/... objects and then back to proto again had
similar issues to what Python is experiencing. Unfortunately, Beam Java is
not well prepared to be the caller of the XLang expansion service as it
currently does not remember the components map as part of the pipeline so
that the ids can be propagated correctly. I think there would be a good
amount of code that would need to change related to pipeline construction
to fix this. Also, the alpha conversion would have been a lot simpler if we
had forced ids to be globally unique otherwise each local -> id mapping
needs to be scoped to what it is representing.

Cham, I was looking to see what people were thinking in this space and to
share it with the wider community and I think making sure that an object ->
id context map is part of the pipeline construction makes a lot of sense as
a solution to preserve these ids. The other part of this is that we need to
document what these opaque blobs representing windows fns, do fns, ... are
allowed to reference since runners and SDKs would have the ability to
deduplicate common protos. We do have one level of redirection for
inputs/outputs on PTransforms since it was recognized early on that runners
will need to have the ability to rewire PTransforms. We also have it on
user state, timers and side inputs on ParDoPayload since runners may need
to rewrite the coder associated with them to add length prefixing when
necessary. This all ties into documenting what can be saved inside these
opaque blobs and what SDKs and runners can assume about what is legal to do
to the pipeline proto representation.

On Thu, Jul 2, 2020 at 10:19 AM Robert Bradshaw  wrote:

> I just finished reading BEAM-10143 and pr/12067, and I agree this is
> exactly the issue. The inability to do alpha conversions is exactly why the
> namespace prefix was introduced for external transforms, but the fact that
> the caller uses an ephemeral context to create ids for the input
> PCollections (and their coders, windowing strategies, etc.) when calling
> external transforms, which is again distinct from that used to convert the
> whole pipeline, causes issues here.
>
> Keeping this mapping around for the lifetime of the pipeline, as is done
> in pr/12067, seems a reasonable solution. Another possibility is to create
> these ephemeral contexts with the prescribed namespace in the caller, and
> leverage the fact that we can alpha convert the PCollections, to avoid
> collisions for stuff like this. (The coders and windowing strategies would
> be duplicated in the graph, so that's less than ideal.)
>
>
> On Thu, Jul 2, 2020 at 10:03 AM Kenneth Knowles  wrote:
>
>> I had a very long chat with Brian about this the other day over
>> https://github.com/apache/beam/pull/12067
>>
>> The most important piece of background: ** the ids are just pointers [1]
>> and every other use of them is a misuse of the structure **. Equivalently,
>> they are bound variables and the binding is in the Components. Using the
>> name for any purpose other than referring to the component is a misuse [2]
>> of the structure.
>>
>> For anyone that has not done language programming, this is the same as
>> managing bindings. It is sometimes tricky to get right, but it is also
>> solved [3]. The term to search for is "alpha conversion" and there are many
>> resources. I think talking about unique ids muddles the issue. These are
>> locally unique for exactly the same mundane reason that variables have
>> different names.
>>
>> However, it is more complicated for Beam, because payloads by design can
>> be opaque, so they cannot be alpha converted. This is true even without
>> xlang. Once a payload exists within a proto with components, it cannot be
>> separated from those components since you cannot (in general) know which
>> are referenced. This design could be revised to allow alpha conversion of
>> opaque payloads by exposing ids via a local layer of indirection.
>>
>> Suppose you have components with "id_a", "id_b", ...
>> Instead of { urn: "xyz", payload: <> "id_a", "id_b", ...>> }
>> We could encode as { urn: "xyz", references: { "local_a": "id_a",
>> "local_b": "id_b", ... }, payload: <> bindings "local_a", "local_b", ...>> }
>>
>> With this change we can alpha convert opaque payloads by altering the
>> values in the "references" map. I am not advocating for such a major change
>> now, but this might help to think about things.
>>
>> I think the simple guideline of "only translate once, and keep the
>> Components around" will be enough.
>>
>> The issue as I came to understand it is as simple as this: if you pass a
>> parameter to a PTransform by way of xlang/runner API, then the 

Re: XLang sub-graph representation within the SDKs pipeline types

2020-07-02 Thread Robert Bradshaw
I just finished reading BEAM-10143 and pr/12067, and I agree this is
exactly the issue. The inability to do alpha conversions is exactly why the
namespace prefix was introduced for external transforms, but the fact that
the caller uses an ephemeral context to create ids for the input
PCollections (and their coders, windowing strategies, etc.) when calling
external transforms, which is again distinct from that used to convert the
whole pipeline, causes issues here.

Keeping this mapping around for the lifetime of the pipeline, as is done
in pr/12067, seems a reasonable solution. Another possibility is to create
these ephemeral contexts with the prescribed namespace in the caller, and
leverage the fact that we can alpha convert the PCollections, to avoid
collisions for stuff like this. (The coders and windowing strategies would
be duplicated in the graph, so that's less than ideal.)


On Thu, Jul 2, 2020 at 10:03 AM Kenneth Knowles  wrote:

> I had a very long chat with Brian about this the other day over
> https://github.com/apache/beam/pull/12067
>
> The most important piece of background: ** the ids are just pointers [1]
> and every other use of them is a misuse of the structure **. Equivalently,
> they are bound variables and the binding is in the Components. Using the
> name for any purpose other than referring to the component is a misuse [2]
> of the structure.
>
> For anyone that has not done language programming, this is the same as
> managing bindings. It is sometimes tricky to get right, but it is also
> solved [3]. The term to search for is "alpha conversion" and there are many
> resources. I think talking about unique ids muddles the issue. These are
> locally unique for exactly the same mundane reason that variables have
> different names.
>
> However, it is more complicated for Beam, because payloads by design can
> be opaque, so they cannot be alpha converted. This is true even without
> xlang. Once a payload exists within a proto with components, it cannot be
> separated from those components since you cannot (in general) know which
> are referenced. This design could be revised to allow alpha conversion of
> opaque payloads by exposing ids via a local layer of indirection.
>
> Suppose you have components with "id_a", "id_b", ...
> Instead of { urn: "xyz", payload: < "id_a", "id_b", ...>> }
> We could encode as { urn: "xyz", references: { "local_a": "id_a",
> "local_b": "id_b", ... }, payload: < bindings "local_a", "local_b", ...>> }
>
> With this change we can alpha convert opaque payloads by altering the
> values in the "references" map. I am not advocating for such a major change
> now, but this might help to think about things.
>
> I think the simple guideline of "only translate once, and keep the
> Components around" will be enough.
>
> The issue as I came to understand it is as simple as this: if you pass a
> parameter to a PTransform by way of xlang/runner API, then the resulting
> payloads are able to have ids embedded from both the Components you sent
> and the Components you got back. You have to keep all of them or you may
> produce unbound variables. You always initialize a new pipeline or pipeline
> fragment with the Components gathered so far and their associations.
>
> In the Python SDK this is complicated by a lot of this stuff being
> controlled and passed in to to_runner_api which could be too late.
>
> In the Java SDK this is probably complicated by the separation of
> runners-core-construction, though I don't know if that has bitten yet.
>
> Kenn
>
> [1] The reason for using this approach at all is to represent a graph in
> proto and also for saving space by reusing things like coders and windowing
> strategies (Dataflow's v1b3 has major size issues). I don't know that there
> is another way to do this.
> [2] Since Dataflow's v1b3 API design has major space problems, the v1b3
> message was redefined to be inside the scope of the Components bindings.
> Now the message has no meaning outside of it.
> [3] I use "solved" in the same sense as "breadth-first search is solved".
> We know how to do it, there is not much to discuss, and if you try to do it
> some other way you are probably making a mistake.
>
> On Thu, Jul 2, 2020 at 9:04 AM Robert Bradshaw 
> wrote:
>
>> I agree this has been a major source of pain. The primary cause of issues
>> is the conversion from Beam protos back to the SDK objects (which doesn't
>> always have a good representation, especially for foreign-language
>> components). In my experience, the SDK objects -> Beam Proto conversions
>> aren't generally a problem (except sometimes for objects that were formerly
>> converted from protos).
>>
>> In my opinion, the solution is to convert to Beam protos and never
>> convert back. (Well, not until we get to the workers, but at that point we
>> can confidently say everything we need to decode does actually belong to
>> the ambient environment.) As mentioned, Dataflow is being fixed, and the
>> only other 

Re: XLang sub-graph representation within the SDKs pipeline types

2020-07-02 Thread Kenneth Knowles
I had a very long chat with Brian about this the other day over
https://github.com/apache/beam/pull/12067

The most important piece of background: ** the ids are just pointers [1]
and every other use of them is a misuse of the structure **. Equivalently,
they are bound variables and the binding is in the Components. Using the
name for any purpose other than referring to the component is a misuse [2]
of the structure.

For anyone that has not done language programming, this is the same as
managing bindings. It is sometimes tricky to get right, but it is also
solved [3]. The term to search for is "alpha conversion" and there are many
resources. I think talking about unique ids muddles the issue. These are
locally unique for exactly the same mundane reason that variables have
different names.

However, it is more complicated for Beam, because payloads by design can be
opaque, so they cannot be alpha converted. This is true even without xlang.
Once a payload exists within a proto with components, it cannot be
separated from those components since you cannot (in general) know which
are referenced. This design could be revised to allow alpha conversion of
opaque payloads by exposing ids via a local layer of indirection.

Suppose you have components with "id_a", "id_b", ...
Instead of { urn: "xyz", payload: <> }
We could encode as { urn: "xyz", references: { "local_a": "id_a",
"local_b": "id_b", ... }, payload: <> }

With this change we can alpha convert opaque payloads by altering the
values in the "references" map. I am not advocating for such a major change
now, but this might help to think about things.

I think the simple guideline of "only translate once, and keep the
Components around" will be enough.

The issue as I came to understand it is as simple as this: if you pass a
parameter to a PTransform by way of xlang/runner API, then the resulting
payloads are able to have ids embedded from both the Components you sent
and the Components you got back. You have to keep all of them or you may
produce unbound variables. You always initialize a new pipeline or pipeline
fragment with the Components gathered so far and their associations.

In the Python SDK this is complicated by a lot of this stuff being
controlled and passed in to to_runner_api which could be too late.

In the Java SDK this is probably complicated by the separation of
runners-core-construction, though I don't know if that has bitten yet.

Kenn

[1] The reason for using this approach at all is to represent a graph in
proto and also for saving space by reusing things like coders and windowing
strategies (Dataflow's v1b3 has major size issues). I don't know that there
is another way to do this.
[2] Since Dataflow's v1b3 API design has major space problems, the v1b3
message was redefined to be inside the scope of the Components bindings.
Now the message has no meaning outside of it.
[3] I use "solved" in the same sense as "breadth-first search is solved".
We know how to do it, there is not much to discuss, and if you try to do it
some other way you are probably making a mistake.

On Thu, Jul 2, 2020 at 9:04 AM Robert Bradshaw  wrote:

> I agree this has been a major source of pain. The primary cause of issues
> is the conversion from Beam protos back to the SDK objects (which doesn't
> always have a good representation, especially for foreign-language
> components). In my experience, the SDK objects -> Beam Proto conversions
> aren't generally a problem (except sometimes for objects that were formerly
> converted from protos).
>
> In my opinion, the solution is to convert to Beam protos and never convert
> back. (Well, not until we get to the workers, but at that point we can
> confidently say everything we need to decode does actually belong to the
> ambient environment.) As mentioned, Dataflow is being fixed, and the only
> other runner (in Python) that doesn't consume the Beam protos directly is
> the old direct runner (which Pablo is working on making obsolete, and
> doesn't support cross language anyway). So finally fixing dataflow should
> be all we need to do. (Have we seen these issues on other runners?)
>
> On the Java side, I think all the optimization stuff works on its SDK
> representation so care needs to be done to make that conversion faithful or
> convert that code to act on the protos directly.
>
> As for why we went with the current approach, it's simply the fact that
> SDK representation -> Dataflow v1beta3 predated any of the beam protos
> stuff, and re-using that code seemed easier than updating the Dataflow
> service to accept Beam protos (or re-writing it) as we had the SDK
> representation for everything in hand (until cross-langauge came along that
> is).
>
>
> On Thu, Jul 2, 2020 at 3:11 AM Heejong Lee  wrote:
>
>>
>>
>> On Wed, Jul 1, 2020 at 7:18 PM Robert Burke  wrote:
>>
>>> From the Go SDK side, it was built that way nearly from the start.
>>> Historically there was a direct SDK rep -> Dataflow rep conversion, but
>>> 

Re: XLang sub-graph representation within the SDKs pipeline types

2020-07-02 Thread Chamikara Jayalath
For Python and Java SDKs, SDK object -> Beam proto conversion and SDK
object -> Dataflow job request conversion were developed independently and
both were subsequently updated to support x-lang.

AFAIK SDK object -> Beam proto conversion was developed when we first added
support for Beam runner API protos to the SDK. In Python SDK we basically
iteratively invoke to_runner_api() implementations of  various SDK graph
objects to build the full Beam pipeline proto. To support x-lang we updated
to_runner_api() method of ExternalTransform (implemented
in to_runner_api_transform for transforms) to attach the pipeline segment
received from the external SDK during this process.
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py#L386
This has been working fairly well but Brian recently found an issue related
to unique ID collisions: https://github.com/apache/beam/pull/12067

Luke, do you think there's a fundamental issue with the above that we have
to fix in addition to direct Beam pipeline proto to Dataflow proto
conversion that we are already working on ? Note that this particular piece
of code is used by all runners when using x-lang.

Thanks,
Cham


On Thu, Jul 2, 2020 at 9:04 AM Robert Bradshaw  wrote:

> I agree this has been a major source of pain. The primary cause of issues
> is the conversion from Beam protos back to the SDK objects (which doesn't
> always have a good representation, especially for foreign-language
> components). In my experience, the SDK objects -> Beam Proto conversions
> aren't generally a problem (except sometimes for objects that were formerly
> converted from protos).
>
> In my opinion, the solution is to convert to Beam protos and never convert
> back. (Well, not until we get to the workers, but at that point we can
> confidently say everything we need to decode does actually belong to the
> ambient environment.) As mentioned, Dataflow is being fixed, and the only
> other runner (in Python) that doesn't consume the Beam protos directly is
> the old direct runner (which Pablo is working on making obsolete, and
> doesn't support cross language anyway). So finally fixing dataflow should
> be all we need to do. (Have we seen these issues on other runners?)
>
> On the Java side, I think all the optimization stuff works on its SDK
> representation so care needs to be done to make that conversion faithful or
> convert that code to act on the protos directly.
>
> As for why we went with the current approach, it's simply the fact that
> SDK representation -> Dataflow v1beta3 predated any of the beam protos
> stuff, and re-using that code seemed easier than updating the Dataflow
> service to accept Beam protos (or re-writing it) as we had the SDK
> representation for everything in hand (until cross-langauge came along that
> is).
>
>
> On Thu, Jul 2, 2020 at 3:11 AM Heejong Lee  wrote:
>
>>
>>
>> On Wed, Jul 1, 2020 at 7:18 PM Robert Burke  wrote:
>>
>>> From the Go SDK side, it was built that way nearly from the start.
>>> Historically there was a direct SDK rep -> Dataflow rep conversion, but
>>> that's been replaced with a SDK rep -> Beam Proto -> Dataflow rep
>>> conversion.
>>>
>>> In particular, this approach had a few benefits: easier to access local
>>> context for pipeline validation at construction time, to permit as early a
>>> failure as possible, which might be easier with native language constructs
>>> vs beam representations of them.(Eg. DoFns not matching ParDo & Collection
>>> types, and similar)
>>> Protos are convenient, but impose certain structure on how the pipeline
>>> graph is handled. (This isn't to say an earlier conversion isn't possible,
>>> one can do almost anything in code, but it lets the structure be optimised
>>> for this case.)
>>>
>>> The big advantage of translating from Beam proto -> to Dataflow Rep is
>>> that the Dataflow Rep can get the various unique IDs that are mandated for
>>> the Beam proto process.
>>>
>>> However, the same can't really be said for the other way around.  A good
>>> question is "when should the unique IDs be assigned?"
>>>
>>
>> This is very true and I would like to elaborate more on the source of
>> friction when using external transforms. As Robert mentioned, pipeline
>> proto refers to each component by unique IDs and the unique ID is only
>> assigned when we convert SDK pipeline object to pipeline proto. Before
>> XLang, pipeline object to pipeline proto conversion happened one time
>> during the job submission phase. However, after XLang transform was
>> introduced, it also happens when we request expansion of external
>> transforms to the expansion service. Unique ID generated for the expansion
>> request can be embedded in the returning external proto and conflicted
>> later with other unique IDs generated for the job submission.
>>
>>
>>>
>>> While I'm not working on adding XLang to the Go SDK directly (that would
>>> be our wonderful intern, Kevin),  I've kind of pictured that the 

Re: XLang sub-graph representation within the SDKs pipeline types

2020-07-02 Thread Robert Bradshaw
I agree this has been a major source of pain. The primary cause of issues
is the conversion from Beam protos back to the SDK objects (which doesn't
always have a good representation, especially for foreign-language
components). In my experience, the SDK objects -> Beam Proto conversions
aren't generally a problem (except sometimes for objects that were formerly
converted from protos).

In my opinion, the solution is to convert to Beam protos and never convert
back. (Well, not until we get to the workers, but at that point we can
confidently say everything we need to decode does actually belong to the
ambient environment.) As mentioned, Dataflow is being fixed, and the only
other runner (in Python) that doesn't consume the Beam protos directly is
the old direct runner (which Pablo is working on making obsolete, and
doesn't support cross language anyway). So finally fixing dataflow should
be all we need to do. (Have we seen these issues on other runners?)

On the Java side, I think all the optimization stuff works on its SDK
representation so care needs to be done to make that conversion faithful or
convert that code to act on the protos directly.

As for why we went with the current approach, it's simply the fact that SDK
representation -> Dataflow v1beta3 predated any of the beam protos stuff,
and re-using that code seemed easier than updating the Dataflow service to
accept Beam protos (or re-writing it) as we had the SDK representation for
everything in hand (until cross-langauge came along that is).


On Thu, Jul 2, 2020 at 3:11 AM Heejong Lee  wrote:

>
>
> On Wed, Jul 1, 2020 at 7:18 PM Robert Burke  wrote:
>
>> From the Go SDK side, it was built that way nearly from the start.
>> Historically there was a direct SDK rep -> Dataflow rep conversion, but
>> that's been replaced with a SDK rep -> Beam Proto -> Dataflow rep
>> conversion.
>>
>> In particular, this approach had a few benefits: easier to access local
>> context for pipeline validation at construction time, to permit as early a
>> failure as possible, which might be easier with native language constructs
>> vs beam representations of them.(Eg. DoFns not matching ParDo & Collection
>> types, and similar)
>> Protos are convenient, but impose certain structure on how the pipeline
>> graph is handled. (This isn't to say an earlier conversion isn't possible,
>> one can do almost anything in code, but it lets the structure be optimised
>> for this case.)
>>
>> The big advantage of translating from Beam proto -> to Dataflow Rep is
>> that the Dataflow Rep can get the various unique IDs that are mandated for
>> the Beam proto process.
>>
>> However, the same can't really be said for the other way around.  A good
>> question is "when should the unique IDs be assigned?"
>>
>
> This is very true and I would like to elaborate more on the source of
> friction when using external transforms. As Robert mentioned, pipeline
> proto refers to each component by unique IDs and the unique ID is only
> assigned when we convert SDK pipeline object to pipeline proto. Before
> XLang, pipeline object to pipeline proto conversion happened one time
> during the job submission phase. However, after XLang transform was
> introduced, it also happens when we request expansion of external
> transforms to the expansion service. Unique ID generated for the expansion
> request can be embedded in the returning external proto and conflicted
> later with other unique IDs generated for the job submission.
>
>
>>
>> While I'm not working on adding XLang to the Go SDK directly (that would
>> be our wonderful intern, Kevin),  I've kind of pictured that the process
>> was to provide the Expansion service with unique placeholders if unable to
>> provide the right IDs, and substitute them in returned pipeline graph
>> segment afterwards, once that is known. That is, we can be relatively
>> certain that the expansion service will be self consistent, but it's the
>> SDK requesting the expansion's responsibility to ensure they aren't
>> colliding with the primary SDKs pipeline ids.
>>
>
> AFAIK, we're already doing this in Java and Python SDKs. Not providing a
> "placeholder" but remembering which pipeline object maps to which unique ID
> used in the expanded component proto.
>
>
>>
>> Otherwise, we could probably recommend a translation protocol (if one
>> doesn't exist already, it probably does) and when XLang expansions are to
>> happen in the SDK -> beam proto process. So something like Pass 1, intern
>> all coders and Pcollections, Pass 2 intern all DoFns and environments, Pass
>> 3 expand Xlang, ... Etc.
>>
>
> Not sure I understand correctly but a following transform who consumes the
> output of an external transform needs some information like the output
> pcollection information from the expanded external transform during the
> pipeline construction phase.
>
>
>> The other half of this is when happens when Going from Beam proto a
>> -> SDK? This happens during pipeline execution, 

Re: XLang sub-graph representation within the SDKs pipeline types

2020-07-02 Thread Heejong Lee
On Wed, Jul 1, 2020 at 7:18 PM Robert Burke  wrote:

> From the Go SDK side, it was built that way nearly from the start.
> Historically there was a direct SDK rep -> Dataflow rep conversion, but
> that's been replaced with a SDK rep -> Beam Proto -> Dataflow rep
> conversion.
>
> In particular, this approach had a few benefits: easier to access local
> context for pipeline validation at construction time, to permit as early a
> failure as possible, which might be easier with native language constructs
> vs beam representations of them.(Eg. DoFns not matching ParDo & Collection
> types, and similar)
> Protos are convenient, but impose certain structure on how the pipeline
> graph is handled. (This isn't to say an earlier conversion isn't possible,
> one can do almost anything in code, but it lets the structure be optimised
> for this case.)
>
> The big advantage of translating from Beam proto -> to Dataflow Rep is
> that the Dataflow Rep can get the various unique IDs that are mandated for
> the Beam proto process.
>
> However, the same can't really be said for the other way around.  A good
> question is "when should the unique IDs be assigned?"
>

This is very true and I would like to elaborate more on the source of
friction when using external transforms. As Robert mentioned, pipeline
proto refers to each component by unique IDs and the unique ID is only
assigned when we convert SDK pipeline object to pipeline proto. Before
XLang, pipeline object to pipeline proto conversion happened one time
during the job submission phase. However, after XLang transform was
introduced, it also happens when we request expansion of external
transforms to the expansion service. Unique ID generated for the expansion
request can be embedded in the returning external proto and conflicted
later with other unique IDs generated for the job submission.


>
> While I'm not working on adding XLang to the Go SDK directly (that would
> be our wonderful intern, Kevin),  I've kind of pictured that the process
> was to provide the Expansion service with unique placeholders if unable to
> provide the right IDs, and substitute them in returned pipeline graph
> segment afterwards, once that is known. That is, we can be relatively
> certain that the expansion service will be self consistent, but it's the
> SDK requesting the expansion's responsibility to ensure they aren't
> colliding with the primary SDKs pipeline ids.
>

AFAIK, we're already doing this in Java and Python SDKs. Not providing a
"placeholder" but remembering which pipeline object maps to which unique ID
used in the expanded component proto.


>
> Otherwise, we could probably recommend a translation protocol (if one
> doesn't exist already, it probably does) and when XLang expansions are to
> happen in the SDK -> beam proto process. So something like Pass 1, intern
> all coders and Pcollections, Pass 2 intern all DoFns and environments, Pass
> 3 expand Xlang, ... Etc.
>

Not sure I understand correctly but a following transform who consumes the
output of an external transform needs some information like the output
pcollection information from the expanded external transform during the
pipeline construction phase.


> The other half of this is when happens when Going from Beam proto a
> -> SDK? This happens during pipeline execution, but at least in the Go SDK
> partly happens when creating the Dataflow rep. In particular, Coder
> reference values only have a populated ID when they've been "rehydrated"
> from the Beam proto, since the Beam Proto is the first place where such IDs
> are correctly assigned.
>
> Tl;dr; i think the right question to sort out is when should IDs be
> expected to be assigned and available during pipeline construction.
>
> On Wed, Jul 1, 2020, 6:34 PM Luke Cwik  wrote:
>
>> It seems like we keep running into translation issues with XLang due to
>> how it is represented in the SDK. (e.g. Brian's work on context map due to
>> loss of coder ids, Heejong's work related to missing environment ids on
>> windowing strategies).
>>
>> I understand that there is an effort that is Dataflow specific where the
>> conversion of the Beam proto -> Dataflow API (v1b3) will help with some
>> issues but it still requires the SDK pipeline representation -> Beam proto
>> to occur correctly which won't be fixed by the Dataflow specific effort.
>>
>> Why did we go with the current approach?
>>
>> What other ways could we do this?
>>
>


Re: XLang sub-graph representation within the SDKs pipeline types

2020-07-01 Thread Robert Burke
>From the Go SDK side, it was built that way nearly from the start.
Historically there was a direct SDK rep -> Dataflow rep conversion, but
that's been replaced with a SDK rep -> Beam Proto -> Dataflow rep
conversion.

In particular, this approach had a few benefits: easier to access local
context for pipeline validation at construction time, to permit as early a
failure as possible, which might be easier with native language constructs
vs beam representations of them.(Eg. DoFns not matching ParDo & Collection
types, and similar)
Protos are convenient, but impose certain structure on how the pipeline
graph is handled. (This isn't to say an earlier conversion isn't possible,
one can do almost anything in code, but it lets the structure be optimised
for this case.)

The big advantage of translating from Beam proto -> to Dataflow Rep is that
the Dataflow Rep can get the various unique IDs that are mandated for the
Beam proto process.

However, the same can't really be said for the other way around.  A good
question is "when should the unique IDs be assigned?"

While I'm not working on adding XLang to the Go SDK directly (that would be
our wonderful intern, Kevin),  I've kind of pictured that the process was
to provide the Expansion service with unique placeholders if unable to
provide the right IDs, and substitute them in returned pipeline graph
segment afterwards, once that is known. That is, we can be relatively
certain that the expansion service will be self consistent, but it's the
SDK requesting the expansion's responsibility to ensure they aren't
colliding with the primary SDKs pipeline ids.

Otherwise, we could probably recommend a translation protocol (if one
doesn't exist already, it probably does) and when XLang expansions are to
happen in the SDK -> beam proto process. So something like Pass 1, intern
all coders and Pcollections, Pass 2 intern all DoFns and environments, Pass
3 expand Xlang, ... Etc.

The other half of this is when happens when Going from Beam proto a
-> SDK? This happens during pipeline execution, but at least in the Go SDK
partly happens when creating the Dataflow rep. In particular, Coder
reference values only have a populated ID when they've been "rehydrated"
from the Beam proto, since the Beam Proto is the first place where such IDs
are correctly assigned.

Tl;dr; i think the right question to sort out is when should IDs be
expected to be assigned and available during pipeline construction.

On Wed, Jul 1, 2020, 6:34 PM Luke Cwik  wrote:

> It seems like we keep running into translation issues with XLang due to
> how it is represented in the SDK. (e.g. Brian's work on context map due to
> loss of coder ids, Heejong's work related to missing environment ids on
> windowing strategies).
>
> I understand that there is an effort that is Dataflow specific where the
> conversion of the Beam proto -> Dataflow API (v1b3) will help with some
> issues but it still requires the SDK pipeline representation -> Beam proto
> to occur correctly which won't be fixed by the Dataflow specific effort.
>
> Why did we go with the current approach?
>
> What other ways could we do this?
>


XLang sub-graph representation within the SDKs pipeline types

2020-07-01 Thread Luke Cwik
It seems like we keep running into translation issues with XLang due to how
it is represented in the SDK. (e.g. Brian's work on context map due to loss
of coder ids, Heejong's work related to missing environment ids on
windowing strategies).

I understand that there is an effort that is Dataflow specific where the
conversion of the Beam proto -> Dataflow API (v1b3) will help with some
issues but it still requires the SDK pipeline representation -> Beam proto
to occur correctly which won't be fixed by the Dataflow specific effort.

Why did we go with the current approach?

What other ways could we do this?