Thanks for the remarks. Correct, we do not need the static URN at all in
the payload. We can pass the transform URN with the ExternalTransform as
part of the ExpansionRequest. So this is sufficient for the Proto:
message ConfigValue {
string coder_urn = 1;
bytes payload = 2;
}
message ExternalTransformPayload {
map configuration = 1;
}
Considering Schemas, I'm not sure they are useful for the scope of the
PR. I think basic Java Reflection is enough.
Thanks,
Max
On 11.03.19 18:36, Robert Bradshaw wrote:
On Mon, Mar 11, 2019 at 6:05 PM Chamikara Jayalath wrote:
On Mon, Mar 11, 2019 at 9:27 AM Robert Bradshaw wrote:
On Mon, Mar 11, 2019 at 4:37 PM Maximilian Michels wrote:
Just to clarify. What's the reason for including a PROPERTIES enum here instead
of directly making beam_urn a field of ExternalTransformPayload ?
The URN is supposed to be static. We always use the same URN for this
type of external transform. We probably want an additional identifier to
point to the resource we want to configure.
It does feel odd to not use the URN to specify the transform itself,
and embed the true identity in an inner proto. The notion of
"external" is just how it happens to be invoked in this pipeline, not
part of its intrinsic definition. As we want introspection
capabilities in the service, we should be able to use the URN at a top
level and know what kind of payload it expects. I would also like to
see this kind of information populated for non-extern transforms which
could be good for visibility (substitution, visualization, etc.) for
runners and other pipeline-consuming tools.
Like so:
message ExternalTransformPayload {
enum Enum {
PROPERTIES = 0
[(beam_urn) = "beam:external:transform:external_transform:v1"];
}
// A fully-qualified identifier, e.g. Java package + class
string identifier = 1;
I'd rather the identifier have semantic rather than
implementation-specific meaning. e.g. one could imagine multiple
implementations of a given transform that different services could
offer.
// the format may change to map if types are supported
map parameters = 2;
}
The identifier could also be a URN.
Can we change first version to map ? Otherwise the set of
transforms we can support/test will be very limited.
How do we do that? Do we define a set of standard coders for supported
types? On the Java side we can lookup the coder by extracting the field
from the Pojo, but we can't do that in Python.
I'll let Reuven comment on exact relevance and timelines on Beam Schema related
work here but till we have that probably we can support the standard set of
coders that are well defined here ?
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L542
So in Python side the ExternalTransform can take a list of parameters (of types
that have standard coders) which will be converted to bytes to be sent over the
wire. In Java side corresponding standard coders (which are determined by
introspection of transform builder's payload POJO) can be used to covert bytes
to objects.
They also need to agree on the field types as well as names, so would
it be map>. I'm not sure the tradeoff
between going further down this road vs. getting schemas up to par in
Python (and, next, Go). And supporting this long term in parallel to
what we come up with schemas.
Hopefully Beam schema work will give us a more generalized way to convert objects across
languages (for example, Python object -> Python Row + Schema -> Java Row + Schema
-> Java object). Note that we run into the same issue when data tries to cross SDK
boundaries when executing cross-language pipelines.
+1, which is another reason I want to accelerate the language
independence of schemas.
Can we re-use some of the Beam schemas-related work/utilities here ?
Yes, that was the plan.
On this note, Reuven, what is the plan (and timeline) for a
language-independent representation of schemas? The crux of the
problem is that the user needs to specify some kind of configuration
(call it C) to construct the transform (call it T). This would be
handled by a TransformBuilder that provides (at least) a mapping
C -> T. (Possibly this interface could be offered on the transform
itself).
The question we are trying to answer here is how to represent C, in
both the source and target language, and on the wire. The idea is that
we could leverage the schema infrastructure such that C could be a
POJO in Java (and perhaps a dict in Python). We would want to extend
Schemas and Row (or perhaps a sub/super/sibling class thereof) to
allow for Coder and UDF-typed fields. (Exactly how to represent UDFs
is still very TBD.) The payload for a external transform using this
format would be the tuple (schema, SchemaCoder(schema).encode(C)). The
goal is to not, yet again, invent a cross-language way of defining a
bag of named, typed parameters (aka fields) with language-idiomatic