Hello,
I was trying to recreate java prefix pipeline from 
https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/ 
<https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/> 
but with go sdk. My pipeline code is: 

package main

import (
        "context"
        "flag"
        "fmt"

        "github.com/apache/beam/sdks/v2/go/examples/xlang 
<http://github.com/apache/beam/sdks/v2/go/examples/xlang>"
        "github.com/apache/beam/sdks/v2/go/pkg/beam 
<http://github.com/apache/beam/sdks/v2/go/pkg/beam>"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex 
<http://github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex>"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx 
<http://github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx>"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio 
<http://github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio>"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/log 
<http://github.com/apache/beam/sdks/v2/go/pkg/beam/log>"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx 
<http://github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx>"
)

const readURN = "beam:transform:org.apache.beam:javaprefix:v1"

var (
        input         = flag.String("input", "lorem", "Pubsub input topic.")
        output        = flag.String("output", "output-go", "Pubsub input 
topic.")
        expansionAddr = flag.String("expansion_addr", "localhost:3333", "Pubsub 
input topic.")
)

func main() {
        flag.Parse()
        beam.Init()
        ctx := context.Background()

        p := beam.NewPipeline()
        s := p.Root()

        lines := textio.Read(s, *input)
        withJavaPrefix := xlang.Prefix(s, "java:", *expansionAddr, lines)
        withGoPrefix := beam.ParDo(s, func(s string) string { return 
fmt.Sprintf("go: %s", s) }, withJavaPrefix)
        textio.Write(s, *output, withGoPrefix)

        if err := beamx.Run(context.Background(), p); err != nil {
                log.Exitf(ctx, "Failed to execute job: %v", err)
        }
}
func Prefix(s beam.Scope, prefix string, addr string, col beam.PCollection) 
beam.PCollection {
        s = s.Scope("XLangTest.Prefix")

        pl := beam.CrossLanguagePayload(prefixPayload{Data: prefix})
        outT := beam.UnnamedOutput(typex.New(reflectx.String))
        outs := beam.CrossLanguage(s, "beam:transforms:xlang:test:prefix", pl, 
addr, beam.UnnamedInput(col), outT)
        return outs[beam.UnnamedOutputTag()]
}

type prefixPayload struct {
        Data string
}


I’ve downloaded the 2.40 version of expansion-service and started it with 'java 
-jar beam-examples-multi-language-2.40.0.jar 3333 
--javaClassLookupAllowlistFile=‘*’’.

When i run the pipeline I get error: unexpected edge: 6: External [In(Main): 
string <- {5: string/string GLO}] -> [Out: string -> {6: string/string GLO}]
I would like to ask what is the problem. I dont have a clue what might be wrong.
Cheers,
Sergiusz

Reply via email to