Yes I did, thanks for sending it back :) Copying my previous reply for the
ML:

Hey Thomas,
>
> You are correct that there is no way to inject dynamic information into
> the TypeSerializer configured from the TypeSerializerSnapshot, but that
> should not be a problem for your use case.
>
> The type serializer instantiated from a TypeSerializerSnapshot is only
> used to perform schema migrations. Assuming the schema registry enforces
> all changes are backwards compatible, your snapshot instance can always
> return CompatibleAsIs and its serializer will never be used.
>
> The tradeoff here is that when the schema does change, Flink will not
> eagerly migrate all values in state but instead lazily migrate as state
> values are updated.
>
> Seth
>

Currently the TypeSerializerSnapshot logic is completely deterministic, and
my intuition is that we should not change that. Please let us know if what
I described does not work in practice and we can take it from there.

Seth

On Wed, Nov 10, 2021 at 3:20 AM Arvid Heise <ar...@apache.org> wrote:

> Hi Thomas,
>
> Could you add a sketch of your preferred solution? From what I gathered,
> you have all the information available in your main (probably misunderstood
> that), so what's keeping you from adding the TypeSerializer as a field to
> your UDF?
>
> On Tue, Nov 9, 2021 at 11:42 AM Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> wrote:
>
>> Hi,
>> In my past project I was able to use Spring as a DI provider for Flink
>> Jobs. It actually saves me a lot of hassle while writing/composing jobs and
>> process functions.
>> I was able to use all Spring's Bean annotations along with properties
>> files managed by Spring as it would be a "normal" spring app. The
>> dependencies that I was injecting via Spring were not
>> serialized/deserialized by Flink which actually was something that I wanted
>> to achieved. In some cases it is very hard or maybe even impossible to make
>> some 3rd party classes serializable.
>>
>> Things to highlight here:
>> 1. I did it only for StreamAPI i think it could work also for TableAPI
>> though.
>> 2.I was loading a Spring context from ProcessFunction::open method.
>> I was able to customize via Job parameters which Spring configuration I
>> want to load.
>> After doing this, all fields annotated with @Autowired were injected.
>> 3, I was using standard @Configuration classes
>>
>> Issues:
>> 1. Since i was using operator::open method to load the context, the
>> context will be loaded few times depends on the number of operators
>> deployed on particular Task Manager. This however could be improved.
>> 2. The important thing here was that all your classes have to be
>> "deployed" on every Task Manager/Job Manager in order to load them through
>> DI.
>> We achieved this by using what is called "Job session" cluster. Where our
>> custom Flink docker image was build in a way that it contains our job jar
>> with all dependencies needed.
>>
>> Because of that, we were not be able to use things like AWS EMR or
>> Kinesis.
>>
>> Cheers,
>> Krzysztof Chmielewski
>>
>> wt., 9 lis 2021 o 06:46 Thomas Weise <t...@apache.org> napisaƂ(a):
>>
>>> Hi,
>>>
>>> I was looking into a problem that requires a configurable type
>>> serializer for communication with a schema registry. The service
>>> endpoint can change, so I would not want to make it part of the
>>> serializer snapshot but rather resolve it at graph construction time
>>> (similar to how a Kafka bootstrap URL or JDBC connection URL would not
>>> be embedded into a checkpoint).
>>>
>>> TypeSerializer is instantiated via either TypeInformation or
>>> TypeSerializerSnapshot. While TypeInformation provides access to
>>> ExecutionConfig and therefore ability to access parameters from
>>> GlobalJobParameters that could be provided through the entry point,
>>> restoreSerializer requires the serializer to be constructed from the
>>> snapshot state alone.
>>>
>>> Ideally there would be a dependency injection mechanism for user code.
>>> Discussion in [1] indicated there isn't a direct solution. Has anyone
>>> come across a similar use case and found a way to work around this
>>> limitation? It might be possible to work with a configuration
>>> singleton that initializes from a file in a well known location, but
>>> that depends on the deployment environment and doesn't play nice with
>>> testing.
>>>
>>> Thanks,
>>> Thomas
>>>
>>> [1] https://lists.apache.org/thread/6qbr4b391dcfwxhcvdl066rpv86gpm5o
>>>
>>

Reply via email to