Got it. Thanks for the explanation.
On 2018/06/18 21:40:48, Robert Burke <[email protected]> wrote: > Hello, and thanks for trying out the Go SDK! > > tl;dr; You can bypass the beam correctness checks by pretending your type > is a Protocol Buffer. See create_test.go > <https://inbox.google.com/create_test.gohttps://github.com/apache/beam/blob/master/sdks/go/pkg/beam/create_test.go#L52> > for > an example of what needs to be done. This is not recommended but it works. > > To answer your questions: > You're right, at present, the Go SDK is not happy with the Map type as an > element value. As a rule, it tries to avoid having free-floating pointers > and reference types (like interfaces{}, slices, and maps) and some will > likely never be allowed (like channels). The reason is that it opens up the > risk of modifying the value *after* it has been emitted, which leads to > unpredictable bugs. > > It's at best, a feature that's not presently implemented, due to the > mentioned danger. > > You are not doing anything wrong. AFAICT your pipeline is correct. > > There is a workaround, but we can't guarantee it is a viable long term > solution, since it involves depending on a quirk of protocol buffers. > > Essentially, you can convince both beam and the protocol buffer package > that your custom type is a Proto, and write your own Marshal and Ummarshal > methods to convert it to a []byte (though anything that can do that, should > work). > > See create_test.go > <create_test.gohttps://github.com/apache/beam/blob/master/sdks/go/pkg/beam/create_test.go#L52> > for an example of what needs to be done. > > When the Go SDK has a proper Coder Registry or similar, we might have a > better solution, but the danger will still be there. > > Please let me know if you need help. > > Cheers, > Robert Burke > > On 2018/06/18 21:10:33, [email protected] <[email protected]> wrote: > > I have the following code:> > > > > type Record struct {> > > Timestamp time.Time> > > Payload string> > > }> > > > > type processFn struct {> > > // etc...> > > }> > > > > func (f *processFn) ProcessElement(ctx context.Context, data []byte, emit > func(Record)) {> > > // etc..> > > emit(someRecord)> > > // etc...> > > }> > > > > Which is eventually invoked as:> > > beam.ParDo(scope, &processFn{}, pcoll)> > > > > This seems to work fine using the direct runner until I add a map to the > Record struct as follows:> > > > > type Record struct {> > > Timestamp time.Time> > > Payload string> > > Labels map[string]string> > > }> > > > > Then I get the error mentioned in the subject line.> > > > > My questions are:> > > - Are maps illegal? What is a legal structure? or> > > - Is the feature yet to be implemented? or> > > - Am I doing something wrong? Am I failing to setup my pipeline > correctly?> > > > > > Thanks for your help.> > > >
