Re: [troubleshooting] KafkaIO#write gets stuck "since the associated topicId changed from null to "

2022-09-16 Thread Alexey Romanenko
Thanks Evan for getting back and great that it was resolved by configuration tweaks! — Alexey > On 16 Sep 2022, at 16:46, Evan Galpin wrote: > > Following up to close the loop. I believe the Kafka errors I was seeing were > a red herring. The actual root cause of the issues was worker

Re: [troubleshooting] KafkaIO#write gets stuck "since the associated topicId changed from null to "

2022-09-16 Thread Evan Galpin
Following up to close the loop. I believe the Kafka errors I was seeing were a red herring. The actual root cause of the issues was worker nodes running out of memory, and as a result kafka producers would have difficulty competing for resources over GC thrashing. Increasing the worker node

Re: [Question] [Go SDK] Generic Register DoFn having iter(*interface{})

2022-09-16 Thread Danny McCormick via user
Ah, I see - I didn't realize the problem was happening post-registration. This error originates from https://github.com/apache/beam/blob/8b2676782a62f8bdf912395267056c9f37251338/sdks/go/pkg/beam/core/runtime/graphx/serialize.go#L502 and basically means that we are not able to infer a default coder

Re: [Question] [Go SDK] Generic Register DoFn having iter(*interface{})

2022-09-16 Thread Rener Pereira De Castro via user
Hi Danny, Thank you for your answer. This code below doesn’t work for me: register.Iter1[interface{}]() register.DoFn3x1[context.Context, int, func(*interface{}) bool, error]({}) I'm having this error when trying to run on Dataflow: ``` bad input type caused by: encoding full

Re: [Question] [Go SDK] Generic Register DoFn having iter(*interface{})

2022-09-16 Thread Danny McCormick via user
Hey Rener, you should be able to register that function with `register.DoFn3x1[context.Context, int, func(*interface{}) bool, error]`. You would use DoFn3x1 because you have 3 inputs and 1 output in your ProcessElement, and then the constraints are just the input types to your ProcessElement

[Question] [Go SDK] Generic Register DoFn having iter(*interface{})

2022-09-16 Thread Rener Pereira De Castro via user
Hi guys, I can't find a way to generic register a DoFn with ProcessElement like parquetWriteFn - ProcessElement(ctx context.Context, _ int, iter func(*interface{}) bool)