Essentially I have the following code:
type Writer struct {
Pool WriterPool
}
func (w *Writer) Setup() {
w.Pool = Init()
}
func (w* Writer) ProcessElement(ctx, elem Elem) {
w.Pool.Add(elem)
}
func (w* Writer) Teardown() {
w.Pool.Write()
w.Pool.Close()
}
beam.ParDo0(scope, &Writer{}, elemCollection)
The above code runs fine with the direct runner but not with dataflow.
I added log lines to the above methods, and the ones in Teardown() never
appear in the logs.
If I change my code as follows:
func (w* Writer) ProcessElement(ctx, elem Elem) {
w.Pool.Add(elem)
w.Pool.Write()
}
Then I see the data being written, but I lose the ability to pool, plus I am
leaking connections.
Is this a known issue, or I am going something wrong?
Thanks again for the help.