Hi everyone. I'm trying to implement a streaming pipeline with gosdk inside Dataflow runner, using kafkaio from the xlang package to read a message from one topic, transform this message and publish it in another kafka topic.
My message is a bytearray (avro message encoded in wireformat), if I just read this message from one topic and publish it in another topic, this works perfectly, but if I try to transform this message inside a ParDo, or just send this message for a LogFn, I receive the following error message: *"Error message from worker: generic::unknown: process bundle failed for > instruction process_bundle-2-2 using plan S02-54 : while executing Process > for Plan[S02-54]: 2: Discard 3: PCollection[S02-54-pcollection-61] Out:[2] > 4: ParDo[debug.printKVFn] Out:[3] Sig: func(context.Context, typex.X, > typex.Y) (typex.X, typex.Y) 1: DataSource[S[ptransform-53@localhost:12371], > 0] Out:4 > Coder:W;coder-67<KV;coder-68<N;coder-69<bytes;npbayBrypnByteArrayCoder>,N;coder-70<bytes;npbayBrypnByteArrayCoder>>>!GWC > caused by: source failed caused by: source decode failed caused by: error > decoding bool: received invalid value [3]"* The main functions of my pipeline is declared in this way: pipeline := beam.NewPipeline() s := pipeline.Root() kafkaConsumerOpts := kafkaio.ConsumerConfigs( map[string]string{ "client.dns.lookup": "resolve_canonical_bootstrap_servers_only", "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"engenharia\" password=\"q1w2e3r4\";", "security.protocol": "SASL_PLAINTEXT", "sasl.mechanism": "PLAIN", "group.id": "pocbeamgo", "enable.auto.commit": "false", "auto.offset.reset": "earliest", }, ) read := kafkaio.Read(s, config.ExpansionAddr, config.BootstrapServers, [] string{config.InputTopic}, kafkaConsumerOpts) beam.ParDo0(s, &LogFn{}, read) and the LogFn func init() { register.DoFn1x0[[]byte](&LogFn{}) } // LogFn is a DoFn to log rides. type LogFn struct{} // ProcessElement logs each element it receives. func (fn *LogFn) ProcessElement(elm []byte) { log.Infof(context.Background(), "Ride info: %v", elm) } // FinishBundle waits a bit so the job server finishes receiving logs. func (fn *LogFn) FinishBundle() { time.Sleep(2 * time.Second) } Could anyone help me with this error? Thanks a lot. Leonardo Reis.