Hello, and thanks for trying out the Go SDK!
tl;dr; You can bypass the beam correctness checks by pretending your type
is a Protocol Buffer. See create_test.go
<https://inbox.google.com/create_test.gohttps://github.com/apache/beam/blob/master/sdks/go/pkg/beam/create_test.go#L52>
for
an example of what needs to be done. This is not recommended but it works.
To answer your questions:
You're right, at present, the Go SDK is not happy with the Map type as an
element value. As a rule, it tries to avoid having free-floating pointers
and reference types (like interfaces{}, slices, and maps) and some will
likely never be allowed (like channels). The reason is that it opens up the
risk of modifying the value *after* it has been emitted, which leads to
unpredictable bugs.
It's at best, a feature that's not presently implemented, due to the
mentioned danger.
You are not doing anything wrong. AFAICT your pipeline is correct.
There is a workaround, but we can't guarantee it is a viable long term
solution, since it involves depending on a quirk of protocol buffers.
Essentially, you can convince both beam and the protocol buffer package
that your custom type is a Proto, and write your own Marshal and Ummarshal
methods to convert it to a []byte (though anything that can do that, should
work).
See create_test.go
<create_test.gohttps://github.com/apache/beam/blob/master/sdks/go/pkg/beam/create_test.go#L52>
for an example of what needs to be done.
When the Go SDK has a proper Coder Registry or similar, we might have a
better solution, but the danger will still be there.
Please let me know if you need help.
Cheers,
Robert Burke
On 2018/06/18 21:10:33, [email protected] <[email protected]> wrote:
> I have the following code:>
>
> type Record struct {>
> Timestamp time.Time>
> Payload string>
> }>
>
> type processFn struct {>
> // etc...>
> }>
>
> func (f *processFn) ProcessElement(ctx context.Context, data []byte, emit
func(Record)) {>
> // etc..>
> emit(someRecord)>
> // etc...>
> }>
>
> Which is eventually invoked as:>
> beam.ParDo(scope, &processFn{}, pcoll)>
>
> This seems to work fine using the direct runner until I add a map to the
Record struct as follows:>
>
> type Record struct {>
> Timestamp time.Time>
> Payload string>
> Labels map[string]string>
> }>
>
> Then I get the error mentioned in the subject line.>
>
> My questions are:>
> - Are maps illegal? What is a legal structure? or>
> - Is the feature yet to be implemented? or>
> - Am I doing something wrong? Am I failing to setup my pipeline
correctly?>
> >
> Thanks for your help.>
>