Hi Mani, Thanks for debugging and creating a Jira for that. Yes, seems that we have a bug there and, if recovered value is present, we need to register accumulator again. Feel free to contribute a fix and ping me for review.
Alexey > On 14 Jul 2020, at 11:54, Sunny, Mani Kolbe <[email protected]> wrote: > > Hello, > > I am running in an beam application in streaming mode with jobName and > checkPointDir configured. When I recover application from a planned stop, I > am getting failure. > > I did some investigation and noticed that the accumulator is not getting > registered on recovering checkpoint scenario. > > Correct me if I am wrong. If you see the code in the screenshot below from > MetricsAccumulator class on beam v2.22.0, you can see new instance of > MetricsContainerStepMapAccumulator is getting registered on line#64. But if a > recovered value is present, it constructs a new instance with the recovered > value (Line#78). But this new accumulator instance is not getting registered. > This is forcing Spark Driver to throw exception: > java.lang.UnsupportedOperationException: Accumulator must be registered > before send to executor > > Created a ticket: https://issues.apache.org/jira/browse/BEAM-10481 > <https://issues.apache.org/jira/browse/BEAM-10481> > > > Regards, > Mani > > <image001.jpg>
