Hello,
I was trying to recreate java prefix pipeline from
https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/
<https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/>
but with go sdk. My pipeline code is:
package main
import (
"context"
"flag"
"fmt"
"github.com/apache/beam/sdks/v2/go/examples/xlang
<http://github.com/apache/beam/sdks/v2/go/examples/xlang>"
"github.com/apache/beam/sdks/v2/go/pkg/beam
<http://github.com/apache/beam/sdks/v2/go/pkg/beam>"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex
<http://github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex>"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx
<http://github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx>"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio
<http://github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio>"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log
<http://github.com/apache/beam/sdks/v2/go/pkg/beam/log>"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx
<http://github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx>"
)
const readURN = "beam:transform:org.apache.beam:javaprefix:v1"
var (
input = flag.String("input", "lorem", "Pubsub input topic.")
output = flag.String("output", "output-go", "Pubsub input
topic.")
expansionAddr = flag.String("expansion_addr", "localhost:3333", "Pubsub
input topic.")
)
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
p := beam.NewPipeline()
s := p.Root()
lines := textio.Read(s, *input)
withJavaPrefix := xlang.Prefix(s, "java:", *expansionAddr, lines)
withGoPrefix := beam.ParDo(s, func(s string) string { return
fmt.Sprintf("go: %s", s) }, withJavaPrefix)
textio.Write(s, *output, withGoPrefix)
if err := beamx.Run(context.Background(), p); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}
func Prefix(s beam.Scope, prefix string, addr string, col beam.PCollection)
beam.PCollection {
s = s.Scope("XLangTest.Prefix")
pl := beam.CrossLanguagePayload(prefixPayload{Data: prefix})
outT := beam.UnnamedOutput(typex.New(reflectx.String))
outs := beam.CrossLanguage(s, "beam:transforms:xlang:test:prefix", pl,
addr, beam.UnnamedInput(col), outT)
return outs[beam.UnnamedOutputTag()]
}
type prefixPayload struct {
Data string
}
I’ve downloaded the 2.40 version of expansion-service and started it with 'java
-jar beam-examples-multi-language-2.40.0.jar 3333
--javaClassLookupAllowlistFile=‘*’’.
When i run the pipeline I get error: unexpected edge: 6: External [In(Main):
string <- {5: string/string GLO}] -> [Out: string -> {6: string/string GLO}]
I would like to ask what is the problem. I dont have a clue what might be wrong.
Cheers,
Sergiusz