Re: Re: Dependency injection for TypeSerializer?

2021-11-12 Thread Thomas Weise
Hi Arvid,

It is correct that we cannot embed endpoint configuration into the
state snapshot as it may change.

Why access a schema registry for internal state? Based on current
TypeSerializer assumptions, we should know all reader schemas at graph
construction time. But let's consider a generic transformation like
filtering/routing/join that only requires knowledge of a few common
characteristics of records. It does not require the exact schema and
should not require restarting to be able to handle records with a new
schema/version.

It appears that such use cases cannot be supported with TypeSerializer:

1. I tried Seth's suggestion with CompatibleAsIs (with the heap state
backend). Unfortunately the TypeSerializer that is used to deserialize
is still the one restored from snapshot.

2. I attempted to create a TypeSerializer that keeps track of all
schemas seen during serialization so that it can be restored without
external dependency. Unfortunately that also cannot work, because
savepoint creation will first write the serializers before they are
used to write the records.

Thomas

On Thu, Nov 11, 2021 at 12:37 AM Arvid Heise  wrote:
>
> Thanks for clarifying the issue. I'm assuming you can't go the state 
> migration route because the old endpoint may not be reachable anymore? Is 
> that correct?
>
> Why do you need to use a schema registry for internal state anyways? That's a 
> very atypical use case. You would usually like to exert full control over the 
> schema of state within your application. What happens if the schema in the 
> registry adds new fields? They would be completely ignored without 
> programmatic changes. Or do also compile the query dynamically?
>
> Usually, you'd treat state like a file and embed the writer schema in the 
> state, so there is no need for an external registry. That should also work 
> with dynamic queries. Only when you output messages into an event queue, 
> you'd need the schema registry but that's orthogonal.
>
> On Wed, Nov 10, 2021 at 7:09 PM Thomas Weise  wrote:
>>
>> Thanks for the feedback!
>>
>> @Seth
>>
>> Your suggestion should work, I yet have to try it out. However relying on 
>> undocumented behavior (return CompatibleAsIs and its serializer will never 
>> be used) would make me hesitant to adopt it as permanent solution.
>>
>> @Arvid
>>
>> There is no issue with constructing the deserializer via main and TypeInfo. 
>> Although a more explicit way to inject dependencies would be desirable, 
>> along the lines of the open method elsewhere. The bigger limitation I run 
>> into is the restore from snapshot behavior, which requires to construct the 
>> TypeSerializer with the no arg constructor and no alternative way to pass a 
>> dependency.
>>
>> Thanks,
>> Thomas
>>
>>
>> On 2021/11/10 13:23:21 Seth Wiesman wrote:
>> > Yes I did, thanks for sending it back :) Copying my previous reply for the
>> > ML:
>> >
>> > Hey Thomas,
>> > >
>> > > You are correct that there is no way to inject dynamic information into
>> > > the TypeSerializer configured from the TypeSerializerSnapshot, but that
>> > > should not be a problem for your use case.
>> > >
>> > > The type serializer instantiated from a TypeSerializerSnapshot is only
>> > > used to perform schema migrations. Assuming the schema registry enforces
>> > > all changes are backwards compatible, your snapshot instance can always
>> > > return CompatibleAsIs and its serializer will never be used.
>> > >
>> > > The tradeoff here is that when the schema does change, Flink will not
>> > > eagerly migrate all values in state but instead lazily migrate as state
>> > > values are updated.
>> > >
>> > > Seth
>> > >
>> >
>> > Currently the TypeSerializerSnapshot logic is completely deterministic, and
>> > my intuition is that we should not change that. Please let us know if what
>> > I described does not work in practice and we can take it from there.
>> >
>> > Seth
>> >
>> > On Wed, Nov 10, 2021 at 3:20 AM Arvid Heise  wrote:
>> >
>> > > Hi Thomas,
>> > >
>> > > Could you add a sketch of your preferred solution? From what I gathered,
>> > > you have all the information available in your main (probably 
>> > > misunderstood
>> > > that), so what's keeping you from adding the TypeSerializer as a field to
>> > > your UDF?
>> > >
>> > > On Tue, Nov 9, 2021 at 11:42 AM Krzysztof Chmielewski <
>> > > krzysiek.chmielew...@gmail.com> wrote:
>> > >
>> > >> Hi,
>> > >> In my past project I was able to use Spring as a DI provider for Flink
>> > >> Jobs. It actually saves me a lot of hassle while writing/composing jobs 
>> > >> and
>> > >> process functions.
>> > >> I was able to use all Spring's Bean annotations along with properties
>> > >> files managed by Spring as it would be a "normal" spring app. The
>> > >> dependencies that I was injecting via Spring were not
>> > >> serialized/deserialized by Flink which actually was something that I 
>> > >> wanted
>> > >> to achieved. In some cases it is very hard or maybe even imp

Re: Re: Dependency injection for TypeSerializer?

2021-11-11 Thread Arvid Heise
Thanks for clarifying the issue. I'm assuming you can't go the state
migration route because the old endpoint may not be reachable anymore? Is
that correct?

Why do you need to use a schema registry for internal state anyways? That's
a very atypical use case. You would usually like to exert full control over
the schema of state within your application. What happens if the schema in
the registry adds new fields? They would be completely ignored without
programmatic changes. Or do also compile the query dynamically?

Usually, you'd treat state like a file and embed the writer schema in the
state, so there is no need for an external registry. That should also work
with dynamic queries. Only when you output messages into an event queue,
you'd need the schema registry but that's orthogonal.

On Wed, Nov 10, 2021 at 7:09 PM Thomas Weise  wrote:

> Thanks for the feedback!
>
> @Seth
>
> Your suggestion should work, I yet have to try it out. However relying on
> undocumented behavior (return CompatibleAsIs and its serializer will never
> be used) would make me hesitant to adopt it as permanent solution.
>
> @Arvid
>
> There is no issue with constructing the deserializer via main and
> TypeInfo. Although a more explicit way to inject dependencies would be
> desirable, along the lines of the open method elsewhere. The bigger
> limitation I run into is the restore from snapshot behavior, which requires
> to construct the TypeSerializer with the no arg constructor and no
> alternative way to pass a dependency.
>
> Thanks,
> Thomas
>
>
> On 2021/11/10 13:23:21 Seth Wiesman wrote:
> > Yes I did, thanks for sending it back :) Copying my previous reply for
> the
> > ML:
> >
> > Hey Thomas,
> > >
> > > You are correct that there is no way to inject dynamic information into
> > > the TypeSerializer configured from the TypeSerializerSnapshot, but that
> > > should not be a problem for your use case.
> > >
> > > The type serializer instantiated from a TypeSerializerSnapshot is only
> > > used to perform schema migrations. Assuming the schema registry
> enforces
> > > all changes are backwards compatible, your snapshot instance can always
> > > return CompatibleAsIs and its serializer will never be used.
> > >
> > > The tradeoff here is that when the schema does change, Flink will not
> > > eagerly migrate all values in state but instead lazily migrate as state
> > > values are updated.
> > >
> > > Seth
> > >
> >
> > Currently the TypeSerializerSnapshot logic is completely deterministic,
> and
> > my intuition is that we should not change that. Please let us know if
> what
> > I described does not work in practice and we can take it from there.
> >
> > Seth
> >
> > On Wed, Nov 10, 2021 at 3:20 AM Arvid Heise  wrote:
> >
> > > Hi Thomas,
> > >
> > > Could you add a sketch of your preferred solution? From what I
> gathered,
> > > you have all the information available in your main (probably
> misunderstood
> > > that), so what's keeping you from adding the TypeSerializer as a field
> to
> > > your UDF?
> > >
> > > On Tue, Nov 9, 2021 at 11:42 AM Krzysztof Chmielewski <
> > > krzysiek.chmielew...@gmail.com> wrote:
> > >
> > >> Hi,
> > >> In my past project I was able to use Spring as a DI provider for Flink
> > >> Jobs. It actually saves me a lot of hassle while writing/composing
> jobs and
> > >> process functions.
> > >> I was able to use all Spring's Bean annotations along with properties
> > >> files managed by Spring as it would be a "normal" spring app. The
> > >> dependencies that I was injecting via Spring were not
> > >> serialized/deserialized by Flink which actually was something that I
> wanted
> > >> to achieved. In some cases it is very hard or maybe even impossible
> to make
> > >> some 3rd party classes serializable.
> > >>
> > >> Things to highlight here:
> > >> 1. I did it only for StreamAPI i think it could work also for TableAPI
> > >> though.
> > >> 2.I was loading a Spring context from ProcessFunction::open method.
> > >> I was able to customize via Job parameters which Spring configuration
> I
> > >> want to load.
> > >> After doing this, all fields annotated with @Autowired were injected.
> > >> 3, I was using standard @Configuration classes
> > >>
> > >> Issues:
> > >> 1. Since i was using operator::open method to load the context, the
> > >> context will be loaded few times depends on the number of operators
> > >> deployed on particular Task Manager. This however could be improved.
> > >> 2. The important thing here was that all your classes have to be
> > >> "deployed" on every Task Manager/Job Manager in order to load them
> through
> > >> DI.
> > >> We achieved this by using what is called "Job session" cluster. Where
> our
> > >> custom Flink docker image was build in a way that it contains our job
> jar
> > >> with all dependencies needed.
> > >>
> > >> Because of that, we were not be able to use things like AWS EMR or
> > >> Kinesis.
> > >>
> > >> Cheers,
> > >> Krzysztof Chmielewski
> > >>
> > >>

Re: Re: Dependency injection for TypeSerializer?

2021-11-10 Thread Thomas Weise
Thanks for the feedback!

@Seth

Your suggestion should work, I yet have to try it out. However relying on 
undocumented behavior (return CompatibleAsIs and its serializer will never be 
used) would make me hesitant to adopt it as permanent solution.

@Arvid

There is no issue with constructing the deserializer via main and TypeInfo. 
Although a more explicit way to inject dependencies would be desirable, along 
the lines of the open method elsewhere. The bigger limitation I run into is the 
restore from snapshot behavior, which requires to construct the TypeSerializer 
with the no arg constructor and no alternative way to pass a dependency. 

Thanks,
Thomas


On 2021/11/10 13:23:21 Seth Wiesman wrote:
> Yes I did, thanks for sending it back :) Copying my previous reply for the
> ML:
> 
> Hey Thomas,
> >
> > You are correct that there is no way to inject dynamic information into
> > the TypeSerializer configured from the TypeSerializerSnapshot, but that
> > should not be a problem for your use case.
> >
> > The type serializer instantiated from a TypeSerializerSnapshot is only
> > used to perform schema migrations. Assuming the schema registry enforces
> > all changes are backwards compatible, your snapshot instance can always
> > return CompatibleAsIs and its serializer will never be used.
> >
> > The tradeoff here is that when the schema does change, Flink will not
> > eagerly migrate all values in state but instead lazily migrate as state
> > values are updated.
> >
> > Seth
> >
> 
> Currently the TypeSerializerSnapshot logic is completely deterministic, and
> my intuition is that we should not change that. Please let us know if what
> I described does not work in practice and we can take it from there.
> 
> Seth
> 
> On Wed, Nov 10, 2021 at 3:20 AM Arvid Heise  wrote:
> 
> > Hi Thomas,
> >
> > Could you add a sketch of your preferred solution? From what I gathered,
> > you have all the information available in your main (probably misunderstood
> > that), so what's keeping you from adding the TypeSerializer as a field to
> > your UDF?
> >
> > On Tue, Nov 9, 2021 at 11:42 AM Krzysztof Chmielewski <
> > krzysiek.chmielew...@gmail.com> wrote:
> >
> >> Hi,
> >> In my past project I was able to use Spring as a DI provider for Flink
> >> Jobs. It actually saves me a lot of hassle while writing/composing jobs and
> >> process functions.
> >> I was able to use all Spring's Bean annotations along with properties
> >> files managed by Spring as it would be a "normal" spring app. The
> >> dependencies that I was injecting via Spring were not
> >> serialized/deserialized by Flink which actually was something that I wanted
> >> to achieved. In some cases it is very hard or maybe even impossible to make
> >> some 3rd party classes serializable.
> >>
> >> Things to highlight here:
> >> 1. I did it only for StreamAPI i think it could work also for TableAPI
> >> though.
> >> 2.I was loading a Spring context from ProcessFunction::open method.
> >> I was able to customize via Job parameters which Spring configuration I
> >> want to load.
> >> After doing this, all fields annotated with @Autowired were injected.
> >> 3, I was using standard @Configuration classes
> >>
> >> Issues:
> >> 1. Since i was using operator::open method to load the context, the
> >> context will be loaded few times depends on the number of operators
> >> deployed on particular Task Manager. This however could be improved.
> >> 2. The important thing here was that all your classes have to be
> >> "deployed" on every Task Manager/Job Manager in order to load them through
> >> DI.
> >> We achieved this by using what is called "Job session" cluster. Where our
> >> custom Flink docker image was build in a way that it contains our job jar
> >> with all dependencies needed.
> >>
> >> Because of that, we were not be able to use things like AWS EMR or
> >> Kinesis.
> >>
> >> Cheers,
> >> Krzysztof Chmielewski
> >>
> >> wt., 9 lis 2021 o 06:46 Thomas Weise  napisał(a):
> >>
> >>> Hi,
> >>>
> >>> I was looking into a problem that requires a configurable type
> >>> serializer for communication with a schema registry. The service
> >>> endpoint can change, so I would not want to make it part of the
> >>> serializer snapshot but rather resolve it at graph construction time
> >>> (similar to how a Kafka bootstrap URL or JDBC connection URL would not
> >>> be embedded into a checkpoint).
> >>>
> >>> TypeSerializer is instantiated via either TypeInformation or
> >>> TypeSerializerSnapshot. While TypeInformation provides access to
> >>> ExecutionConfig and therefore ability to access parameters from
> >>> GlobalJobParameters that could be provided through the entry point,
> >>> restoreSerializer requires the serializer to be constructed from the
> >>> snapshot state alone.
> >>>
> >>> Ideally there would be a dependency injection mechanism for user code.
> >>> Discussion in [1] indicated there isn't a direct solution. Has anyone
> >>> come across a similar use case 

Re: Dependency injection for TypeSerializer?

2021-11-10 Thread Seth Wiesman
Yes I did, thanks for sending it back :) Copying my previous reply for the
ML:

Hey Thomas,
>
> You are correct that there is no way to inject dynamic information into
> the TypeSerializer configured from the TypeSerializerSnapshot, but that
> should not be a problem for your use case.
>
> The type serializer instantiated from a TypeSerializerSnapshot is only
> used to perform schema migrations. Assuming the schema registry enforces
> all changes are backwards compatible, your snapshot instance can always
> return CompatibleAsIs and its serializer will never be used.
>
> The tradeoff here is that when the schema does change, Flink will not
> eagerly migrate all values in state but instead lazily migrate as state
> values are updated.
>
> Seth
>

Currently the TypeSerializerSnapshot logic is completely deterministic, and
my intuition is that we should not change that. Please let us know if what
I described does not work in practice and we can take it from there.

Seth

On Wed, Nov 10, 2021 at 3:20 AM Arvid Heise  wrote:

> Hi Thomas,
>
> Could you add a sketch of your preferred solution? From what I gathered,
> you have all the information available in your main (probably misunderstood
> that), so what's keeping you from adding the TypeSerializer as a field to
> your UDF?
>
> On Tue, Nov 9, 2021 at 11:42 AM Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> wrote:
>
>> Hi,
>> In my past project I was able to use Spring as a DI provider for Flink
>> Jobs. It actually saves me a lot of hassle while writing/composing jobs and
>> process functions.
>> I was able to use all Spring's Bean annotations along with properties
>> files managed by Spring as it would be a "normal" spring app. The
>> dependencies that I was injecting via Spring were not
>> serialized/deserialized by Flink which actually was something that I wanted
>> to achieved. In some cases it is very hard or maybe even impossible to make
>> some 3rd party classes serializable.
>>
>> Things to highlight here:
>> 1. I did it only for StreamAPI i think it could work also for TableAPI
>> though.
>> 2.I was loading a Spring context from ProcessFunction::open method.
>> I was able to customize via Job parameters which Spring configuration I
>> want to load.
>> After doing this, all fields annotated with @Autowired were injected.
>> 3, I was using standard @Configuration classes
>>
>> Issues:
>> 1. Since i was using operator::open method to load the context, the
>> context will be loaded few times depends on the number of operators
>> deployed on particular Task Manager. This however could be improved.
>> 2. The important thing here was that all your classes have to be
>> "deployed" on every Task Manager/Job Manager in order to load them through
>> DI.
>> We achieved this by using what is called "Job session" cluster. Where our
>> custom Flink docker image was build in a way that it contains our job jar
>> with all dependencies needed.
>>
>> Because of that, we were not be able to use things like AWS EMR or
>> Kinesis.
>>
>> Cheers,
>> Krzysztof Chmielewski
>>
>> wt., 9 lis 2021 o 06:46 Thomas Weise  napisał(a):
>>
>>> Hi,
>>>
>>> I was looking into a problem that requires a configurable type
>>> serializer for communication with a schema registry. The service
>>> endpoint can change, so I would not want to make it part of the
>>> serializer snapshot but rather resolve it at graph construction time
>>> (similar to how a Kafka bootstrap URL or JDBC connection URL would not
>>> be embedded into a checkpoint).
>>>
>>> TypeSerializer is instantiated via either TypeInformation or
>>> TypeSerializerSnapshot. While TypeInformation provides access to
>>> ExecutionConfig and therefore ability to access parameters from
>>> GlobalJobParameters that could be provided through the entry point,
>>> restoreSerializer requires the serializer to be constructed from the
>>> snapshot state alone.
>>>
>>> Ideally there would be a dependency injection mechanism for user code.
>>> Discussion in [1] indicated there isn't a direct solution. Has anyone
>>> come across a similar use case and found a way to work around this
>>> limitation? It might be possible to work with a configuration
>>> singleton that initializes from a file in a well known location, but
>>> that depends on the deployment environment and doesn't play nice with
>>> testing.
>>>
>>> Thanks,
>>> Thomas
>>>
>>> [1] https://lists.apache.org/thread/6qbr4b391dcfwxhcvdl066rpv86gpm5o
>>>
>>


Re: Dependency injection for TypeSerializer?

2021-11-10 Thread Arvid Heise
Hi Thomas,

Could you add a sketch of your preferred solution? From what I gathered,
you have all the information available in your main (probably misunderstood
that), so what's keeping you from adding the TypeSerializer as a field to
your UDF?

On Tue, Nov 9, 2021 at 11:42 AM Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> Hi,
> In my past project I was able to use Spring as a DI provider for Flink
> Jobs. It actually saves me a lot of hassle while writing/composing jobs and
> process functions.
> I was able to use all Spring's Bean annotations along with properties
> files managed by Spring as it would be a "normal" spring app. The
> dependencies that I was injecting via Spring were not
> serialized/deserialized by Flink which actually was something that I wanted
> to achieved. In some cases it is very hard or maybe even impossible to make
> some 3rd party classes serializable.
>
> Things to highlight here:
> 1. I did it only for StreamAPI i think it could work also for TableAPI
> though.
> 2.I was loading a Spring context from ProcessFunction::open method.
> I was able to customize via Job parameters which Spring configuration I
> want to load.
> After doing this, all fields annotated with @Autowired were injected.
> 3, I was using standard @Configuration classes
>
> Issues:
> 1. Since i was using operator::open method to load the context, the
> context will be loaded few times depends on the number of operators
> deployed on particular Task Manager. This however could be improved.
> 2. The important thing here was that all your classes have to be
> "deployed" on every Task Manager/Job Manager in order to load them through
> DI.
> We achieved this by using what is called "Job session" cluster. Where our
> custom Flink docker image was build in a way that it contains our job jar
> with all dependencies needed.
>
> Because of that, we were not be able to use things like AWS EMR or Kinesis.
>
> Cheers,
> Krzysztof Chmielewski
>
> wt., 9 lis 2021 o 06:46 Thomas Weise  napisał(a):
>
>> Hi,
>>
>> I was looking into a problem that requires a configurable type
>> serializer for communication with a schema registry. The service
>> endpoint can change, so I would not want to make it part of the
>> serializer snapshot but rather resolve it at graph construction time
>> (similar to how a Kafka bootstrap URL or JDBC connection URL would not
>> be embedded into a checkpoint).
>>
>> TypeSerializer is instantiated via either TypeInformation or
>> TypeSerializerSnapshot. While TypeInformation provides access to
>> ExecutionConfig and therefore ability to access parameters from
>> GlobalJobParameters that could be provided through the entry point,
>> restoreSerializer requires the serializer to be constructed from the
>> snapshot state alone.
>>
>> Ideally there would be a dependency injection mechanism for user code.
>> Discussion in [1] indicated there isn't a direct solution. Has anyone
>> come across a similar use case and found a way to work around this
>> limitation? It might be possible to work with a configuration
>> singleton that initializes from a file in a well known location, but
>> that depends on the deployment environment and doesn't play nice with
>> testing.
>>
>> Thanks,
>> Thomas
>>
>> [1] https://lists.apache.org/thread/6qbr4b391dcfwxhcvdl066rpv86gpm5o
>>
>


Re: Dependency injection for TypeSerializer?

2021-11-09 Thread Krzysztof Chmielewski
Hi,
In my past project I was able to use Spring as a DI provider for Flink
Jobs. It actually saves me a lot of hassle while writing/composing jobs and
process functions.
I was able to use all Spring's Bean annotations along with properties files
managed by Spring as it would be a "normal" spring app. The dependencies
that I was injecting via Spring were not serialized/deserialized by Flink
which actually was something that I wanted to achieved. In some cases it is
very hard or maybe even impossible to make some 3rd party classes
serializable.

Things to highlight here:
1. I did it only for StreamAPI i think it could work also for TableAPI
though.
2.I was loading a Spring context from ProcessFunction::open method.
I was able to customize via Job parameters which Spring configuration I
want to load.
After doing this, all fields annotated with @Autowired were injected.
3, I was using standard @Configuration classes

Issues:
1. Since i was using operator::open method to load the context, the context
will be loaded few times depends on the number of operators deployed on
particular Task Manager. This however could be improved.
2. The important thing here was that all your classes have to be "deployed"
on every Task Manager/Job Manager in order to load them through DI.
We achieved this by using what is called "Job session" cluster. Where our
custom Flink docker image was build in a way that it contains our job jar
with all dependencies needed.

Because of that, we were not be able to use things like AWS EMR or Kinesis.

Cheers,
Krzysztof Chmielewski

wt., 9 lis 2021 o 06:46 Thomas Weise  napisał(a):

> Hi,
>
> I was looking into a problem that requires a configurable type
> serializer for communication with a schema registry. The service
> endpoint can change, so I would not want to make it part of the
> serializer snapshot but rather resolve it at graph construction time
> (similar to how a Kafka bootstrap URL or JDBC connection URL would not
> be embedded into a checkpoint).
>
> TypeSerializer is instantiated via either TypeInformation or
> TypeSerializerSnapshot. While TypeInformation provides access to
> ExecutionConfig and therefore ability to access parameters from
> GlobalJobParameters that could be provided through the entry point,
> restoreSerializer requires the serializer to be constructed from the
> snapshot state alone.
>
> Ideally there would be a dependency injection mechanism for user code.
> Discussion in [1] indicated there isn't a direct solution. Has anyone
> come across a similar use case and found a way to work around this
> limitation? It might be possible to work with a configuration
> singleton that initializes from a file in a well known location, but
> that depends on the deployment environment and doesn't play nice with
> testing.
>
> Thanks,
> Thomas
>
> [1] https://lists.apache.org/thread/6qbr4b391dcfwxhcvdl066rpv86gpm5o
>


Dependency injection for TypeSerializer?

2021-11-08 Thread Thomas Weise
Hi,

I was looking into a problem that requires a configurable type
serializer for communication with a schema registry. The service
endpoint can change, so I would not want to make it part of the
serializer snapshot but rather resolve it at graph construction time
(similar to how a Kafka bootstrap URL or JDBC connection URL would not
be embedded into a checkpoint).

TypeSerializer is instantiated via either TypeInformation or
TypeSerializerSnapshot. While TypeInformation provides access to
ExecutionConfig and therefore ability to access parameters from
GlobalJobParameters that could be provided through the entry point,
restoreSerializer requires the serializer to be constructed from the
snapshot state alone.

Ideally there would be a dependency injection mechanism for user code.
Discussion in [1] indicated there isn't a direct solution. Has anyone
come across a similar use case and found a way to work around this
limitation? It might be possible to work with a configuration
singleton that initializes from a file in a well known location, but
that depends on the deployment environment and doesn't play nice with
testing.

Thanks,
Thomas

[1] https://lists.apache.org/thread/6qbr4b391dcfwxhcvdl066rpv86gpm5o