Hello,
Thank you very much for the reply. I will try running the second example with 
dataflow runner. Here is the full output from the first example:

2022/08/02 12:50:19 Executing pipeline with the direct runner.
2022/08/02 12:50:19 Pipeline:
2022/08/02 12:50:19 Nodes: {1: []uint8/bytes GLO}
{2: string/string GLO}
{3: string/string GLO}
{4: KV<string,int64>/KV<string,varint> GLO}
{5: string/string GLO}
{6: string/string GLO}
{7: string/string GLO}
{8: KV<int,string>/KV<int[varintz],string> GLO}
{9: CoGBK<int,string>/CoGBK<int[varintz],string> GLO}
Edges: 1: Impulse [] -> [Out: []uint8 -> {1: []uint8/bytes GLO}]
2: ParDo [In(Main): []uint8 <- {1: []uint8/bytes GLO}] -> [Out: T -> {2: 
string/string GLO}]
3: ParDo [In(Main): string <- {2: string/string GLO}] -> [Out: string -> {3: 
string/string GLO}]
4: ParDo [In(Main): string <- {3: string/string GLO}] -> [Out: KV<string,int64> 
-> {4: KV<string,int64>/KV<string,varint> GLO}]
5: ParDo [In(Main): KV<string,int64> <- {4: KV<string,int64>/KV<string,varint> 
GLO}] -> [Out: string -> {5: string/string GLO}]
6: External [In(Main): string <- {5: string/string GLO}] -> [Out: string -> {6: 
string/string GLO}]
7: ParDo [In(Main): string <- {6: string/string GLO}] -> [Out: string -> {7: 
string/string GLO}]
8: ParDo [In(Main): T <- {7: string/string GLO}] -> [Out: KV<int,T> -> {8: 
KV<int,string>/KV<int[varintz],string> GLO}]
9: CoGBK [In(Main): KV<int,string> <- {8: 
KV<int,string>/KV<int[varintz],string> GLO}] -> [Out: CoGBK<int,string> -> {9: 
CoGBK<int,string>/CoGBK<int[varintz],string> GLO}]
10: ParDo [In(Main): CoGBK<int,string> <- {9: 
CoGBK<int,string>/CoGBK<int[varintz],string> GLO}] -> []
2022/08/02 12:50:19 Failed to execute job: translation failed
        caused by:
unexpected edge: 6: External [In(Main): string <- {5: string/string GLO}] -> 
[Out: string -> {6: string/string GLO}]
exit status 1


 And also I’m sorry, I made a mistake when pasting the first example. I used a 
custom Prefix transform which uses a different urn. I will paste it once again 
so there is no confusion.

package main

import (
        "context"
        "flag"
        "fmt"

        "github.com/apache/beam/sdks/v2/go/pkg/beam"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

const readURN = "beam:transform:org.apache.beam:javaprefix:v1"

var (
        input         = flag.String("input", "lorem", "Pubsub input topic.")
        output        = flag.String("output", "output-go", "Pubsub input 
topic.")
        expansionAddr = flag.String("expansion_addr", "localhost:3333", "Pubsub 
input topic.")
)

func main() {
        flag.Parse()
        beam.Init()
        ctx := context.Background()

        p := beam.NewPipeline()
        s := p.Root()

        lines := textio.Read(s, *input)
        withJavaPrefix := Prefix(s, "java:", *expansionAddr, lines)
        withGoPrefix := beam.ParDo(s, func(s string) string { return 
fmt.Sprintf("go: %s", s) }, withJavaPrefix)
        textio.Write(s, *output, withGoPrefix)

        if err := beamx.Run(context.Background(), p); err != nil {
                log.Exitf(ctx, "Failed to execute job: %v", err)
        }
}
func Prefix(s beam.Scope, prefix string, addr string, col beam.PCollection) 
beam.PCollection {
        s = s.Scope("XLangTest.Prefix")

        pl := beam.CrossLanguagePayload(prefixPayload{Prefix: prefix})
        outT := beam.UnnamedOutput(typex.New(reflectx.String))
        outs := beam.CrossLanguage(s, readURN, pl, addr, 
beam.UnnamedInput(col), outT)
        return outs[beam.UnnamedOutputTag()]
}

type prefixPayload struct {
        Prefix string
}

Cheers,
Sergiusz

Reply via email to