[Go SDK] Error transforming byte array message from kafka inside ParDo

2023-06-06 Thread Leonardo Reis
Hi everyone.
I'm trying to implement a streaming pipeline with gosdk inside Dataflow
runner, using kafkaio from the xlang package to read a message from one
topic, transform this message and publish it in another kafka topic.

My message is a bytearray (avro message encoded in wireformat), if I just
read this message from one topic and publish it in another topic, this
works perfectly, but if I try to transform this message inside a ParDo, or
just send this message for a LogFn, I receive the following error message:

*"Error message from worker: generic::unknown: process bundle failed for
> instruction process_bundle-2-2 using plan S02-54 : while executing Process
> for Plan[S02-54]: 2: Discard 3: PCollection[S02-54-pcollection-61] Out:[2]
> 4: ParDo[debug.printKVFn] Out:[3] Sig: func(context.Context, typex.X,
> typex.Y) (typex.X, typex.Y) 1: DataSource[S[ptransform-53@localhost:12371],
> 0] Out:4
> Coder:W;coder-67,N;coder-70>>!GWC
> caused by: source failed caused by: source decode failed caused by: error
> decoding bool: received invalid value [3]"*


 The main functions of my pipeline is declared in this way:

pipeline := beam.NewPipeline()
s := pipeline.Root()
kafkaConsumerOpts := kafkaio.ConsumerConfigs(
map[string]string{
"client.dns.lookup": "resolve_canonical_bootstrap_servers_only",
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule
required username=\"engenharia\" password=\"q1w2e3r4\";",
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanism": "PLAIN",
"group.id": "pocbeamgo",
"enable.auto.commit": "false",
"auto.offset.reset": "earliest",
},
)

read := kafkaio.Read(s, config.ExpansionAddr, config.BootstrapServers, []
string{config.InputTopic}, kafkaConsumerOpts)
beam.ParDo0(s, {}, read)

and the LogFn
func init() {
register.DoFn1x0[[]byte]({})
}

// LogFn is a DoFn to log rides.
type LogFn struct{}

// ProcessElement logs each element it receives.
func (fn *LogFn) ProcessElement(elm []byte) {
log.Infof(context.Background(), "Ride info: %v", elm)
}

// FinishBundle waits a bit so the job server finishes receiving logs.
func (fn *LogFn) FinishBundle() {
time.Sleep(2 * time.Second)
}

Could anyone help me with this error? Thanks a lot.

Leonardo Reis.


[Question] Invitation to Join Beam Slack Channel

2023-02-13 Thread Leonardo Reis
Hi, I would like to learn more by joining the Beam Slack Channel. But it
seems I can't join with my personal email and I don't have the @apache.org
email either. I would like to ask if I can get invited to the Slack
channel, or maybe there is another way to join?

Thanks! Regards