Okay so I have gone back and confirmed that the translation failed error also originates from the direct runner ( https://github.com/apache/beam/blob/8da6363b6fef8da5e73976be2b1277a776c05239/sdks/go/pkg/beam/runners/direct/direct.go#L337) so you should try to execute on your runner of choice. I'm going to go in and improve the error message so this issue is more clear moving forward, and there's a tracking issue filed at https://github.com/apache/beam/issues/22560. Let me know if you have any more problems, I'm happy to help!
Thanks, Jack McCluskey On Tue, Aug 2, 2022 at 6:57 AM Sergiusz Rokosz <[email protected]> wrote: > Hello, > Thank you very much for the reply. I will try running the second example > with dataflow runner. Here is the full output from the first example: > > 2022/08/02 12:50:19 Executing pipeline with the direct runner. > 2022/08/02 12:50:19 Pipeline: > 2022/08/02 12:50:19 Nodes: {1: []uint8/bytes GLO} > {2: string/string GLO} > {3: string/string GLO} > {4: KV<string,int64>/KV<string,varint> GLO} > {5: string/string GLO} > {6: string/string GLO} > {7: string/string GLO} > {8: KV<int,string>/KV<int[varintz],string> GLO} > {9: CoGBK<int,string>/CoGBK<int[varintz],string> GLO} > Edges: 1: Impulse [] -> [Out: []uint8 -> {1: []uint8/bytes GLO}] > 2: ParDo [In(Main): []uint8 <- {1: []uint8/bytes GLO}] -> [Out: T -> {2: > string/string GLO}] > 3: ParDo [In(Main): string <- {2: string/string GLO}] -> [Out: string -> > {3: string/string GLO}] > 4: ParDo [In(Main): string <- {3: string/string GLO}] -> [Out: > KV<string,int64> -> {4: KV<string,int64>/KV<string,varint> GLO}] > 5: ParDo [In(Main): KV<string,int64> <- {4: > KV<string,int64>/KV<string,varint> GLO}] -> [Out: string -> {5: > string/string GLO}] > 6: External [In(Main): string <- {5: string/string GLO}] -> [Out: string > -> {6: string/string GLO}] > 7: ParDo [In(Main): string <- {6: string/string GLO}] -> [Out: string -> > {7: string/string GLO}] > 8: ParDo [In(Main): T <- {7: string/string GLO}] -> [Out: KV<int,T> -> {8: > KV<int,string>/KV<int[varintz],string> GLO}] > 9: CoGBK [In(Main): KV<int,string> <- {8: > KV<int,string>/KV<int[varintz],string> GLO}] -> [Out: CoGBK<int,string> -> > {9: CoGBK<int,string>/CoGBK<int[varintz],string> GLO}] > 10: ParDo [In(Main): CoGBK<int,string> <- {9: > CoGBK<int,string>/CoGBK<int[varintz],string> GLO}] -> [] > 2022/08/02 12:50:19 Failed to execute job: translation failed > caused by: > unexpected edge: 6: External [In(Main): string <- {5: string/string GLO}] > -> [Out: string -> {6: string/string GLO}] > exit status 1 > > > And also I’m sorry, I made a mistake when pasting the first example. I > used a custom Prefix transform which uses a different urn. I will paste it > once again so there is no confusion. > > package main > > import ( > "context" > "flag" > "fmt" > > "github.com/apache/beam/sdks/v2/go/pkg/beam" > "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" > "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" > "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio" > "github.com/apache/beam/sdks/v2/go/pkg/beam/log" > "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" > ) > > const readURN = "beam:transform:org.apache.beam:javaprefix:v1" > > var ( > input = flag.String("input", "lorem", "Pubsub input > topic.") > output = flag.String("output", "output-go", "Pubsub input > topic.") > expansionAddr = flag.String("expansion_addr", "localhost:3333", > "Pubsub input topic.") > ) > > func main() { > flag.Parse() > beam.Init() > ctx := context.Background() > > p := beam.NewPipeline() > s := p.Root() > > lines := textio.Read(s, *input) > withJavaPrefix := Prefix(s, "java:", *expansionAddr, lines) > withGoPrefix := beam.ParDo(s, func(s string) string { return > fmt.Sprintf("go: %s", s) }, withJavaPrefix) > textio.Write(s, *output, withGoPrefix) > > if err := beamx.Run(context.Background(), p); err != nil { > log.Exitf(ctx, "Failed to execute job: %v", err) > } > } > func Prefix(s beam.Scope, prefix string, addr string, col > beam.PCollection) beam.PCollection { > s = s.Scope("XLangTest.Prefix") > > pl := beam.CrossLanguagePayload(prefixPayload{Prefix: prefix}) > outT := beam.UnnamedOutput(typex.New(reflectx.String)) > outs := beam.CrossLanguage(s, readURN, pl, addr, > beam.UnnamedInput(col), outT) > return outs[beam.UnnamedOutputTag()] > } > > type prefixPayload struct { > Prefix string > } > > Cheers, > Sergiusz
