Seeking suggestions for ingesting large amount of data from S3

2023-02-09 Thread Yang Liu
Hi all,

We are trying to ingest large amounts of data (20TB) from S3 using Flink
filesystem connector to bootstrap a Hudi table. Data are well partitioned
in S3 by date/time, but we have been facing OOM issues in Flink jobs, so we
wanted to update the Flink job to ingest the data chunk by chuck (partition
by partition) by some kind of looping instead of all at once. Curious
what’s the recommended way to do this in Flink. I believe this should be a
common use case, so hope to get some ideas here.

We have been using Table APIs, but open to other APIs.

Thanks & Regards
Eric


Re: Kryo version 2.24.0 from 2015?

2023-02-09 Thread Clayton Wohl
It sounds like the better option is to make a Flink bridge release that
runs both Kryo v2 and Kryo v5 side-by-side.

The code base for the Flink bridge release should only use Kryo v2 for
deserializing legacy data, and use Kryo v5 for serializing+deserializing
new data.

User APIs registering custom serializers would have a breaking change to
take both v2+v5 custom serializers, rather than v2 only. This would be a
breaking API change, but it's the least bad choice. Staying on Kryo v2 is
not a good option.

This approach wouldn't require application downtime beyond what is normal
for Flink upgrades. It would require a breaking API change to custom
serializers but that is unavoidable.

Then, in a future Flink release, you can drop Kryo v2 entirely, and be Kryo
v5 only.

On Thu, Feb 9, 2023 at 11:21 AM Chesnay Schepler  wrote:

> > Can't programmers just code up migration tools to the current version of
> Kryo or whatever serialization platform you choose?
>
> Well yes, if someone writes a tool to implement a reasonable migration
> path than we may be able to upgrade Kryo.
> Until that happens we are blocked on upgrading Kryo.
>
> > Versions older than 3.x aren't supposed to compile nor run correctly
> under Java 11+.
>
> In fact our Java 17 support is currently blocked by an issue that we
> suspect is related to Kryo.
> https://issues.apache.org/jira/browse/FLINK-24998
>
> > I'd presume you would make a tool to upgrade files with Kryo persisted
> state in savepoints and checkpoints
>
> That doesn't cover everything and may not necessarily be a viable approach.
>
> Kryo is exposed a fair bit in our APIs (mistakes of the past...) so users
> that have custom Serializers might also have to change things.
> Upgrading Kryo is thus also an API breaking change.
>
> As for viability, such an approach implies taking down the application on
> one Flink version, converting the state, and restarting the job on a newer
> Flink version.
> This implies a certain amount of downtime for the application, which
> depending on the state size may just not be acceptable to a user.
> Having to migrate the savepoint and upgrading Flink at the same time is
> also not ideal since it makes the effort more complicated; being able to
> run the job on the same Flink version with a different Kryo version would
> make things easier, but that'd mean we have to be able to run 2 Kryo
> versions in parallel.
>
> Something else to consider is when we already break everything to upgrade
> Kryo, then maybe things should be re-written such that upgrading Kryo isn't
> such a problem in the future; in essence reworking how Kryo is integrated
> into Flink.
>
> That said, the v5 migration guide is quite interesting; specifically that
> Kryo offers a versioned jar.
>
> On 09/02/2023 17:32, Clayton Wohl wrote:
>
> What do you mean you are blocked? Can't programmers just code up migration
> tools to the current version of Kryo or whatever serialization platform you
> choose?
>
> Can't you follow the Kryo migration guide that supports loading data
> serialized with Kryo v2 and reserializing with Kryo v5?
> https://github.com/EsotericSoftware/kryo/wiki/Migration-to-v5
>
> I'd presume you would make a tool to upgrade files with Kryo persisted
> state in savepoints and checkpoints, that would allow for users to register
> custom serializers. I also presume that new versions of Flink would
> politely refuse to start with old format state files and require the
> migration process to be completed.
>
> Kryo v2 also pulls in objenesis v2.1 from 2013, before Java 8. Versions
> older than 3.x aren't supposed to compile nor run correctly under Java 11+.
>
>
>
> On Thu, Feb 9, 2023 at 2:34 AM Chesnay Schepler 
> wrote:
>
>>  > you can't reasonably stay on the 2015 version forever, refuse to
>> adopt any of the updates or fixes in the 8 years since then, and
>> reasonably expect things to continue to work well.
>>
>> We are well aware that Kryo is a ticking time bomb.
>>
>>  > Is there any possibility a future release of Flink can upgrade to a
>> recent version of Kryo serialization?
>>
>> Of course there is, but someone needs to figure out a way to do this
>> without breaking everything or providing a reasonable upgrade path,
>> which has been blocking us so far.
>>
>> On 09/02/2023 07:34, Clayton Wohl wrote:
>> > I've noticed the latest Flink is using the Kryo serializer library
>> > version 2.24.0 which is back from 2015!
>> >
>> > The Kryo project is actively maintained, it's on version 5.4.0, so
>> > 2.24.0 is really quite ancient. I presume the concern is maintaining
>> > compatibility with persisted savepoints. That's a valid concern, but
>> > you can't reasonably stay on the 2015 version forever, refuse to adopt
>> > any of the updates or fixes in the 8 years since then, and reasonably
>> > expect things to continue to work well.
>> >
>> > Is there any possibility a future release of Flink can upgrade to a
>> > recent version of Kryo serialization?
>> >
>> >
>>

Re: Kryo version 2.24.0 from 2015?

2023-02-09 Thread Chesnay Schepler
> Can't programmers just code up migration tools to the current version 
of Kryo or whatever serialization platform you choose?


Well yes, if someone writes a tool to implement a reasonable migration 
path than we may be able to upgrade Kryo.

Until that happens we are blocked on upgrading Kryo.

> Versions older than 3.x aren't supposed to compile nor run correctly 
under Java 11+.


In fact our Java 17 support is currently blocked by an issue that we 
suspect is related to Kryo.

https://issues.apache.org/jira/browse/FLINK-24998

> I'd presume you would make a tool to upgrade files with Kryo 
persisted state in savepoints and checkpoints


That doesn't cover everything and may not necessarily be a viable approach.

Kryo is exposed a fair bit in our APIs (mistakes of the past...) so 
users that have custom Serializers might also have to change things.

Upgrading Kryo is thus also an API breaking change.

As for viability, such an approach implies taking down the application 
on one Flink version, converting the state, and restarting the job on a 
newer Flink version.
This implies a certain amount of downtime for the application, which 
depending on the state size may just not be acceptable to a user.
Having to migrate the savepoint and upgrading Flink at the same time is 
also not ideal since it makes the effort more complicated; being able to 
run the job on the same Flink version with a different Kryo version 
would make things easier, but that'd mean we have to be able to run 2 
Kryo versions in parallel.


Something else to consider is when we already break everything to 
upgrade Kryo, then maybe things should be re-written such that upgrading 
Kryo isn't such a problem in the future; in essence reworking how Kryo 
is integrated into Flink.


That said, the v5 migration guide is quite interesting; specifically 
that Kryo offers a versioned jar.


On 09/02/2023 17:32, Clayton Wohl wrote:
What do you mean you are blocked? Can't programmers just code up 
migration tools to the current version of Kryo or whatever 
serialization platform you choose?


Can't you follow the Kryo migration guide that supports loading data 
serialized with Kryo v2 and reserializing with Kryo v5?

https://github.com/EsotericSoftware/kryo/wiki/Migration-to-v5

I'd presume you would make a tool to upgrade files with Kryo persisted 
state in savepoints and checkpoints, that would allow for users to 
register custom serializers. I also presume that new versions of Flink 
would politely refuse to start with old format state files and require 
the migration process to be completed.


Kryo v2 also pulls in objenesis v2.1 from 2013, before Java 8. 
Versions older than 3.x aren't supposed to compile nor run correctly 
under Java 11+.




On Thu, Feb 9, 2023 at 2:34 AM Chesnay Schepler  
wrote:


 > you can't reasonably stay on the 2015 version forever, refuse to
adopt any of the updates or fixes in the 8 years since then, and
reasonably expect things to continue to work well.

We are well aware that Kryo is a ticking time bomb.

 > Is there any possibility a future release of Flink can upgrade
to a
recent version of Kryo serialization?

Of course there is, but someone needs to figure out a way to do this
without breaking everything or providing a reasonable upgrade path,
which has been blocking us so far.

On 09/02/2023 07:34, Clayton Wohl wrote:
> I've noticed the latest Flink is using the Kryo serializer library
> version 2.24.0 which is back from 2015!
>
> The Kryo project is actively maintained, it's on version 5.4.0, so
> 2.24.0 is really quite ancient. I presume the concern is
maintaining
> compatibility with persisted savepoints. That's a valid concern,
but
> you can't reasonably stay on the 2015 version forever, refuse to
adopt
> any of the updates or fixes in the 8 years since then, and
reasonably
> expect things to continue to work well.
>
> Is there any possibility a future release of Flink can upgrade to a
> recent version of Kryo serialization?
>
>
>



Re: Kryo version 2.24.0 from 2015?

2023-02-09 Thread Clayton Wohl
What do you mean you are blocked? Can't programmers just code up migration
tools to the current version of Kryo or whatever serialization platform you
choose?

Can't you follow the Kryo migration guide that supports loading data
serialized with Kryo v2 and reserializing with Kryo v5?
https://github.com/EsotericSoftware/kryo/wiki/Migration-to-v5

I'd presume you would make a tool to upgrade files with Kryo persisted
state in savepoints and checkpoints, that would allow for users to register
custom serializers. I also presume that new versions of Flink would
politely refuse to start with old format state files and require the
migration process to be completed.

Kryo v2 also pulls in objenesis v2.1 from 2013, before Java 8. Versions
older than 3.x aren't supposed to compile nor run correctly under Java 11+.



On Thu, Feb 9, 2023 at 2:34 AM Chesnay Schepler  wrote:

>  > you can't reasonably stay on the 2015 version forever, refuse to
> adopt any of the updates or fixes in the 8 years since then, and
> reasonably expect things to continue to work well.
>
> We are well aware that Kryo is a ticking time bomb.
>
>  > Is there any possibility a future release of Flink can upgrade to a
> recent version of Kryo serialization?
>
> Of course there is, but someone needs to figure out a way to do this
> without breaking everything or providing a reasonable upgrade path,
> which has been blocking us so far.
>
> On 09/02/2023 07:34, Clayton Wohl wrote:
> > I've noticed the latest Flink is using the Kryo serializer library
> > version 2.24.0 which is back from 2015!
> >
> > The Kryo project is actively maintained, it's on version 5.4.0, so
> > 2.24.0 is really quite ancient. I presume the concern is maintaining
> > compatibility with persisted savepoints. That's a valid concern, but
> > you can't reasonably stay on the 2015 version forever, refuse to adopt
> > any of the updates or fixes in the 8 years since then, and reasonably
> > expect things to continue to work well.
> >
> > Is there any possibility a future release of Flink can upgrade to a
> > recent version of Kryo serialization?
> >
> >
> >
>
>


Using Parquet format in Flink hosted in k8s operator

2023-02-09 Thread Frank Lyaruu
Hi all, I’m using the Flink k8s operator to run a SQL stream to/from
various connectors, and just added a Parquet format. I customized the image
a bit per the example (mostly by adding maven downloads of flink-connector*
jars). If I do that for flink-parquet-1.16.1 it fails on missing
org/apache/hadoop/conf/Configuration

I started adding hadoop-common (which contains that class), but that one is
huge and has a bunch of deps, even in that class, so that would be quite
the rabbit hole. I see an old thread that seems very similar:
https://www.mail-archive.com/user@flink.apache.org/msg43028.html but
without any conclusion.

How _is_ this supposed to work? The flink docs on the parquet format don't
mention anything special:
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/formats/parquet/

regards, Frank


Re: How to start a timer when an event occurs?

2023-02-09 Thread Peter Schrott
Hi Emarotti,

It sounds like you want to process your streams based on the event time. This 
means, all your processing, windowing, timer and co are based on a timestamp 
that is provided by individual events. You can read more about here:
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/concepts/time/

When you wanna work with timers I can you point towards Flinks 
“ProcessFunction”. In this low level streaming operator you have more control 
over states, event time, watermarks and timers. You can, as you desire, 
register timers on certain incoming events and react when the timer is 
triggered. You can read more about it here:
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/operators/process_function/

For your use case it’s very important to understand the principals how event 
time based streams are handled and what role watermarks play here. Its also 
possible to handle and react to late events.

I hope this helps to get you started. 

Best, Peter


> On 9. Feb 2023, at 08:22, Eugenio Marotti  wrote:
> 
> Hi everyone. I'm developing a monitoring app and I want to use Flink to 
> process the event stream. I need to start a timer when an event is received 
> in Flink, send the timer value and stop the timer when another event is 
> received. Let me explain better. An event consists of an event name, a source 
> id and other fields. So I have something like this:
> 
> E1("A",1,...) -> E2("B",1,...) -> E3("C",1,...)
> 
> When I receive event "A" I want to start a timer (keyed by the source id) and 
> update a sink with the timer value periodically. When I receive event "C" I 
> want to stop the timer and update the sink with the final timer value. Is 
> there a way to accomplish that in Apache Flink?



Re: Kryo version 2.24.0 from 2015?

2023-02-09 Thread Chesnay Schepler
> you can't reasonably stay on the 2015 version forever, refuse to 
adopt any of the updates or fixes in the 8 years since then, and 
reasonably expect things to continue to work well.


We are well aware that Kryo is a ticking time bomb.

> Is there any possibility a future release of Flink can upgrade to a 
recent version of Kryo serialization?


Of course there is, but someone needs to figure out a way to do this 
without breaking everything or providing a reasonable upgrade path, 
which has been blocking us so far.


On 09/02/2023 07:34, Clayton Wohl wrote:
I've noticed the latest Flink is using the Kryo serializer library 
version 2.24.0 which is back from 2015!


The Kryo project is actively maintained, it's on version 5.4.0, so 
2.24.0 is really quite ancient. I presume the concern is maintaining 
compatibility with persisted savepoints. That's a valid concern, but 
you can't reasonably stay on the 2015 version forever, refuse to adopt 
any of the updates or fixes in the 8 years since then, and reasonably 
expect things to continue to work well.


Is there any possibility a future release of Flink can upgrade to a 
recent version of Kryo serialization?