Hi Thomas, Could you add a sketch of your preferred solution? From what I gathered, you have all the information available in your main (probably misunderstood that), so what's keeping you from adding the TypeSerializer as a field to your UDF?
On Tue, Nov 9, 2021 at 11:42 AM Krzysztof Chmielewski < krzysiek.chmielew...@gmail.com> wrote: > Hi, > In my past project I was able to use Spring as a DI provider for Flink > Jobs. It actually saves me a lot of hassle while writing/composing jobs and > process functions. > I was able to use all Spring's Bean annotations along with properties > files managed by Spring as it would be a "normal" spring app. The > dependencies that I was injecting via Spring were not > serialized/deserialized by Flink which actually was something that I wanted > to achieved. In some cases it is very hard or maybe even impossible to make > some 3rd party classes serializable. > > Things to highlight here: > 1. I did it only for StreamAPI i think it could work also for TableAPI > though. > 2.I was loading a Spring context from ProcessFunction::open method. > I was able to customize via Job parameters which Spring configuration I > want to load. > After doing this, all fields annotated with @Autowired were injected. > 3, I was using standard @Configuration classes > > Issues: > 1. Since i was using operator::open method to load the context, the > context will be loaded few times depends on the number of operators > deployed on particular Task Manager. This however could be improved. > 2. The important thing here was that all your classes have to be > "deployed" on every Task Manager/Job Manager in order to load them through > DI. > We achieved this by using what is called "Job session" cluster. Where our > custom Flink docker image was build in a way that it contains our job jar > with all dependencies needed. > > Because of that, we were not be able to use things like AWS EMR or Kinesis. > > Cheers, > Krzysztof Chmielewski > > wt., 9 lis 2021 o 06:46 Thomas Weise <t...@apache.org> napisaĆ(a): > >> Hi, >> >> I was looking into a problem that requires a configurable type >> serializer for communication with a schema registry. The service >> endpoint can change, so I would not want to make it part of the >> serializer snapshot but rather resolve it at graph construction time >> (similar to how a Kafka bootstrap URL or JDBC connection URL would not >> be embedded into a checkpoint). >> >> TypeSerializer is instantiated via either TypeInformation or >> TypeSerializerSnapshot. While TypeInformation provides access to >> ExecutionConfig and therefore ability to access parameters from >> GlobalJobParameters that could be provided through the entry point, >> restoreSerializer requires the serializer to be constructed from the >> snapshot state alone. >> >> Ideally there would be a dependency injection mechanism for user code. >> Discussion in [1] indicated there isn't a direct solution. Has anyone >> come across a similar use case and found a way to work around this >> limitation? It might be possible to work with a configuration >> singleton that initializes from a file in a well known location, but >> that depends on the deployment environment and doesn't play nice with >> testing. >> >> Thanks, >> Thomas >> >> [1] https://lists.apache.org/thread/6qbr4b391dcfwxhcvdl066rpv86gpm5o >> >