Hi Thomas,

Could you add a sketch of your preferred solution? From what I gathered,
you have all the information available in your main (probably misunderstood
that), so what's keeping you from adding the TypeSerializer as a field to
your UDF?

On Tue, Nov 9, 2021 at 11:42 AM Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> Hi,
> In my past project I was able to use Spring as a DI provider for Flink
> Jobs. It actually saves me a lot of hassle while writing/composing jobs and
> process functions.
> I was able to use all Spring's Bean annotations along with properties
> files managed by Spring as it would be a "normal" spring app. The
> dependencies that I was injecting via Spring were not
> serialized/deserialized by Flink which actually was something that I wanted
> to achieved. In some cases it is very hard or maybe even impossible to make
> some 3rd party classes serializable.
>
> Things to highlight here:
> 1. I did it only for StreamAPI i think it could work also for TableAPI
> though.
> 2.I was loading a Spring context from ProcessFunction::open method.
> I was able to customize via Job parameters which Spring configuration I
> want to load.
> After doing this, all fields annotated with @Autowired were injected.
> 3, I was using standard @Configuration classes
>
> Issues:
> 1. Since i was using operator::open method to load the context, the
> context will be loaded few times depends on the number of operators
> deployed on particular Task Manager. This however could be improved.
> 2. The important thing here was that all your classes have to be
> "deployed" on every Task Manager/Job Manager in order to load them through
> DI.
> We achieved this by using what is called "Job session" cluster. Where our
> custom Flink docker image was build in a way that it contains our job jar
> with all dependencies needed.
>
> Because of that, we were not be able to use things like AWS EMR or Kinesis.
>
> Cheers,
> Krzysztof Chmielewski
>
> wt., 9 lis 2021 o 06:46 Thomas Weise <t...@apache.org> napisaƂ(a):
>
>> Hi,
>>
>> I was looking into a problem that requires a configurable type
>> serializer for communication with a schema registry. The service
>> endpoint can change, so I would not want to make it part of the
>> serializer snapshot but rather resolve it at graph construction time
>> (similar to how a Kafka bootstrap URL or JDBC connection URL would not
>> be embedded into a checkpoint).
>>
>> TypeSerializer is instantiated via either TypeInformation or
>> TypeSerializerSnapshot. While TypeInformation provides access to
>> ExecutionConfig and therefore ability to access parameters from
>> GlobalJobParameters that could be provided through the entry point,
>> restoreSerializer requires the serializer to be constructed from the
>> snapshot state alone.
>>
>> Ideally there would be a dependency injection mechanism for user code.
>> Discussion in [1] indicated there isn't a direct solution. Has anyone
>> come across a similar use case and found a way to work around this
>> limitation? It might be possible to work with a configuration
>> singleton that initializes from a file in a well known location, but
>> that depends on the deployment environment and doesn't play nice with
>> testing.
>>
>> Thanks,
>> Thomas
>>
>> [1] https://lists.apache.org/thread/6qbr4b391dcfwxhcvdl066rpv86gpm5o
>>
>

Reply via email to