Re: Temporal join on rolling aggregate

2024-02-22 Thread mayaming1983
+1 for supporting defining time attributes on views.

I once encountered the same problem as yours. I did some regular joins and lost 
time attribute, and hence I could no longer do window operations in subsequent 
logics. I had to output the joined view to Kafka, read from it again, and 
define watermark on the new source - a cubersome workaround.

It would be more flexible if we could control time attribute / watermark on 
views, just as if it's some kind of special source.

Thanks,
Yaming
在 Feb 22, 2024, 7:46 PM +0800,Gyula Fóra ,写道:
> Posting this to dev as well as it potentially has some implications on 
> development effort.
>
> What seems to be the problem here is that we cannot control/override 
> Timestamps/Watermarks/Primary key on VIEWs. It's understandable that you 
> cannot create a PRIMARY KEY on the view but I think the temporal join also 
> should not require the PK, should we remove this limitation?
>
> The general problem is the inflexibility of the timestamp/watermark handling 
> on query outputs, which makes this again impossible.
>
> The workaround here can be to write the rolling aggregate to Kafka, read it 
> back again and join with that. The fact that this workaround is possible 
> actually highlights the need for more flexibility on the query/view side in 
> my opinion.
>
> Has anyone else run into this issue and considered the proper solution to the 
> problem? Feels like it must be pretty common :)
>
> Cheers,
> Gyula
>
>
>
>
> > On Wed, Feb 21, 2024 at 10:29 PM Sébastien Chevalley  
> > wrote:
> > > Hi,
> > >
> > > I have been trying to write a temporal join in SQL done on a rolling 
> > > aggregate view. However it does not work and throws :
> > >
> > > org.apache.flink.table.api.ValidationException: Event-Time Temporal Table 
> > > Join requires both primary key and row time attribute in versioned table, 
> > > but no row time attribute can be found.
> > >
> > > It seems that after the aggregation, the table looses the watermark and 
> > > it's not possible to add one with the SQL API as it's a view.
> > >
> > > CREATE TABLE orders (
> > >     order_id INT,
> > >     price DECIMAL(6, 2),
> > >     currency_id INT,
> > >     order_time AS NOW(),
> > >     WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND
> > > )
> > >     WITH (
> > >         'connector' = 'datagen',
> > >         'rows-per-second' = '10',
> > >         'fields.order_id.kind' = 'sequence',
> > >         'fields.order_id.start' = '1',
> > >         'fields.order_id.end' = '10',
> > >         'fields.currency_id.min' = '1',
> > >         'fields.currency_id.max' = '20'
> > >     );
> > >
> > > CREATE TABLE currency_rates (
> > >     currency_id INT,
> > >     conversion_rate DECIMAL(4, 3),
> > >     PRIMARY KEY (currency_id) NOT ENFORCED
> > > )
> > >     WITH (
> > >         'connector' = 'datagen',
> > >         'rows-per-second' = '10',
> > >         'fields.currency_id.min' = '1',
> > >         'fields.currency_id.max' = '20'
> > >     );
> > >
> > > CREATE TEMPORARY VIEW max_rates AS (
> > >     SELECT
> > >         currency_id,
> > >         MAX(conversion_rate) AS max_rate
> > >     FROM currency_rates
> > >     GROUP BY currency_id
> > > );
> > >
> > > CREATE TEMPORARY VIEW temporal_join AS (
> > >     SELECT
> > >         order_id,
> > >         max_rates.max_rate
> > >     FROM orders
> > >          LEFT JOIN max_rates FOR SYSTEM_TIME AS OF orders.order_time
> > >          ON orders.currency_id = max_rates.currency_id
> > > );
> > >
> > > SELECT * FROM temporal_join;
> > >
> > > Am I missing something? What would be a good starting point to address 
> > > this?
> > >
> > > Thanks in advance,
> > > Sébastien Chevalley


Re: FW: RE: [DISCUSS] FLIP-314: Support Customized Job Lineage Listener

2024-02-22 Thread Yong Fang
Hi Martijn,

If there're no more comments, I will start a vote for this, thanks

Best,
Fang Yong

On Tue, Feb 20, 2024 at 4:53 PM Yong Fang  wrote:

> Hi Martijn,
>
> Thank you for your attention. Let me first explain the specific situation
> of FLIP-314. FLIP-314 is currently in an accepted state, but actual code
> development has not yet begun, and interface related PR has not been merged
> into the master. So it may not be necessary for us to create a separate
> FLIP. Currently, my idea is to directly update the interface on FLIP-314,
> but to initiate a separate thread with the context and we can vote there.
>
> What do you think? Thanks
>
> Best,
> Fang Yong
>
> On Mon, Feb 19, 2024 at 8:27 PM Martijn Visser 
> wrote:
>
>> I'm a bit confused: did we add new interfaces after FLIP-314 was
>> accepted? If so, please move the new interfaces to a new FLIP and
>> start a separate vote. We can't retrospectively change an accepted
>> FLIP with new interfaces and a new vote.
>>
>> On Mon, Feb 19, 2024 at 3:22 AM Yong Fang  wrote:
>> >
>> > Hi all,
>> >
>> > If there are no more feedbacks, I will start a vote for the new
>> interfaces
>> > in the next day, thanks
>> >
>> > Best,
>> > Fang Yong
>> >
>> > On Thu, Feb 8, 2024 at 1:30 PM Yong Fang  wrote:
>> >
>> > > Hi devs,
>> > >
>> > > According to the online-discussion in FLINK-3127 [1] and
>> > > offline-discussion with Maciej Obuchowski and Zhenqiu Huang, we would
>> like
>> > > to update the lineage vertex relevant interfaces in FLIP-314 [2] as
>> follows:
>> > >
>> > > 1. Introduce `LineageDataset` which represents source and sink in
>> > > `LineageVertex`. The fields in `LineageDataset` are as follows:
>> > > /* Name for this particular dataset. */
>> > > String name;
>> > > /* Unique name for this dataset's storage, for example, url for
>> jdbc
>> > > connector and location for lakehouse connector. */
>> > > String namespace;
>> > > /* Facets for the lineage vertex to describe the particular
>> > > information of dataset, such as schema and config. */
>> > > Map facets;
>> > >
>> > > 2. There may be multiple datasets in one `LineageVertex`, for example,
>> > > kafka source or hybrid source. So users can get dataset list from
>> > > `LineageVertex`:
>> > > /** Get datasets from the lineage vertex. */
>> > > List datasets();
>> > >
>> > > 3. There will be built in facets for config and schema. To describe
>> > > columns in table/sql jobs and datastream jobs, we introduce
>> > > `DatasetSchemaField`.
>> > > /** Builtin config facet for dataset. */
>> > > @PublicEvolving
>> > > public interface DatasetConfigFacet extends LineageDatasetFacet {
>> > > Map config();
>> > > }
>> > >
>> > > /** Field for schema in dataset. */
>> > > public interface DatasetSchemaField {
>> > > /** The name of the field. */
>> > > String name();
>> > > /** The type of the field. */
>> > > T type();
>> > > }
>> > >
>> > > Thanks for valuable inputs from @Maciej and @Zhenqiu. And looking
>> forward
>> > > to your feedback, thanks
>> > >
>> > > Best,
>> > > Fang Yong
>> > >
>> > > On Mon, Sep 25, 2023 at 1:18 PM Shammon FY  wrote:
>> > >
>> > >> Hi David,
>> > >>
>> > >> Do you want the detailed topology for Flink job? You can get
>> > >> `JobDetailsInfo` in `RestCusterClient` with the submitted job id, it
>> has
>> > >> `String jsonPlan`. You can parse the json plan to get all steps and
>> > >> relations between them in a Flink job. Hope this can help you,
>> thanks!
>> > >>
>> > >> Best,
>> > >> Shammon FY
>> > >>
>> > >> On Tue, Sep 19, 2023 at 11:46 PM David Radley <
>> david_rad...@uk.ibm.com>
>> > >> wrote:
>> > >>
>> > >>> Hi there,
>> > >>> I am looking at the interfaces. If I am reading it correctly,there
>> is
>> > >>> one relationship between the source and sink and this relationship
>> > >>> represents the operational lineage. Lineage is usually represented
>> as asset
>> > >>> -> process - > asset – see for example
>> > >>>
>> https://egeria-project.org/features/lineage-management/overview/#the-lineage-graph
>> > >>>
>> > >>> Maybe I am missing it, but it seems to be that it would be useful to
>> > >>> store the process in the lineage graph.
>> > >>>
>> > >>> It is useful to have the top level lineage as source -> Flink job ->
>> > >>> sink. Where the Flink job is the process, but also to have this
>> asset ->
>> > >>> process -> asset pattern for each of the steps in the job. If this
>> is
>> > >>> present, please could you point me to it,
>> > >>>
>> > >>>   Kind regards, David.
>> > >>>
>> > >>>
>> > >>>
>> > >>>
>> > >>>
>> > >>> From: David Radley 
>> > >>> Date: Tuesday, 19 September 2023 at 16:11
>> > >>> To: dev@flink.apache.org 
>> > >>> Subject: [EXTERNAL] RE: [DISCUSS] FLIP-314: Support Customized Job
>> > >>> Lineage Listener
>> > >>> Hi,
>> > >>> I notice that there is an experimental lineage integration for Flink
>> > >>> with OpenLineage 

[jira] [Created] (FLINK-34503) Migrate JoinDeriveNullFilterRule

2024-02-22 Thread Jacky Lau (Jira)
Jacky Lau created FLINK-34503:
-

 Summary: Migrate JoinDeriveNullFilterRule
 Key: FLINK-34503
 URL: https://issues.apache.org/jira/browse/FLINK-34503
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Affects Versions: 1.20.0
Reporter: Jacky Lau
 Fix For: 1.20.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34502) Support calculating network memory for forward and rescale edge

2024-02-22 Thread Rui Fan (Jira)
Rui Fan created FLINK-34502:
---

 Summary: Support calculating network memory for forward and 
rescale edge
 Key: FLINK-34502
 URL: https://issues.apache.org/jira/browse/FLINK-34502
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler
Reporter: Rui Fan
Assignee: Rui Fan


This is follow up Jira of FLINK-34471.

FLINK-34471 assuming all connections type are ALL_TO_ALL. This Jira will 
optimize it to save some network memory for forward and rescale connection.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34501) In AsyncWaitOperator, ResultHandler.completeExceptionally() triggeres task/job restarts after max retry is reached

2024-02-22 Thread Dinesh (Jira)
Dinesh created FLINK-34501:
--

 Summary: In AsyncWaitOperator, 
ResultHandler.completeExceptionally() triggeres task/job restarts after max 
retry is reached
 Key: FLINK-34501
 URL: https://issues.apache.org/jira/browse/FLINK-34501
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: Dinesh


In AsyncWaitOperator, ResultHandler.completeExceptionally() triggeres task/job 
restarts after max retry is reached.

Instead can we allow user to decide on how to handle the exception to avoid job 
restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34500) Release Testing: Verify FLINK-33261 Support Setting Parallelism for Table/SQL Sources

2024-02-22 Thread SuDewei (Jira)
SuDewei created FLINK-34500:
---

 Summary: Release Testing: Verify FLINK-33261 Support Setting 
Parallelism for Table/SQL Sources
 Key: FLINK-34500
 URL: https://issues.apache.org/jira/browse/FLINK-34500
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Parent, Table SQL / API
Affects Versions: 1.19.0
Reporter: SuDewei
 Fix For: 1.19.0


This issue aims to verify 
[FLIP-367|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429150].
Volunteers can verify it by following the [doc 
changes|https://github.com/apache/flink/pull/24234]. Since currently only the 
pre-defined DataGen connector and user-defined connector supports setting 
source parallelism, volunteers can verify it through DataGen Connector. The 
basic steps include:
1. Start a Flink cluster and submit a Flink SQL Job to the cluster.
2. In this Flink Job, use the DataGen SQL Connector to generate data.
3. Specify the parameter scan.parallelism in DataGen connector options as 
user-defined parallelism instead of default parallelism.
4. Observe whether the parallelism of the source has changed on the job graph 
of the Flink Application UI, and whether the shuffle mode is correct.
If everything is normal, you will see that the parallelism of the source 
operator is indeed different from that of downstream, and the shuffle mode is 
rebalanced by default.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] Release flink-connector-parent 1.1.0 release candidate #2

2024-02-22 Thread Etienne Chauchot

Thanks everyone for your vote.

So far we are one binding vote short for the release to pass.

Best

Etienne

Le 20/02/2024 à 09:20, Sergey Nuyanzin a écrit :

Thanks for driving this, Etienne!

+1 (non-binding)

- Verified checksum and signature
- Verified pom
- Built from source
- Verified no binaries
- Checked staging repo on Maven central
- Checked source code tag
- Reviewed web PR


One thing (probably minor) I noticed that the artifacts (uploaded to nexus)
are built with jdk11 while usually it should be with jdk8
Since there is no jars I think it should be ok

On Tue, Feb 20, 2024 at 9:19 AM Hang Ruan  wrote:


+1 (non-binding)

- verified checksum and signature
- checked Github release tag
- checked release notes
- verified no binaries in source
- reviewed the web PR

Best,
Hang

Leonard Xu  于2024年2月20日周二 14:26写道:


+1 (binding)

- verified signatures
- verified hashsums
- built from source code succeeded
- checked Github release tag
- checked release notes
- reviewed all Jira tickets have been resolved
- reviewed the web PR

Best,
Leonard



2024年2月20日 上午11:14,Rui Fan<1996fan...@gmail.com>  写道:

Thanks for driving this, Etienne!

+1 (non-binding)

- Verified checksum and signature
- Verified pom content
- Build source on my Mac with jdk8
- Verified no binaries in source
- Checked staging repo on Maven central
- Checked source code tag
- Reviewed web PR

Best,
Rui

On Tue, Feb 20, 2024 at 10:33 AM Qingsheng Ren

wrote:

Thanks for driving this, Etienne!

+1 (binding)

- Checked release note
- Verified checksum and signature
- Verified pom content
- Verified no binaries in source
- Checked staging repo on Maven central
- Checked source code tag
- Reviewed web PR
- Built Kafka connector from source with parent pom in staging repo

Best,
Qingsheng

On Tue, Feb 20, 2024 at 1:34 AM Etienne Chauchot <

echauc...@apache.org>

wrote:


Hi everyone,
Please review and vote on the release candidate #2 for the version
1.1.0, as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)


The complete staging area is available for your review, which

includes:

* JIRA release notes [1],
* the official Apache source release to be deployed to

dist.apache.org

[2], which are signed with the key with fingerprint
D1A76BA19D6294DD0033F6843A019F0B8DD163EA [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag v1.1.0-rc2 [5],
* website pull request listing the new release [6].

* confluence wiki: connector parent upgrade to version 1.1.0 that

will

be validated after the artifact is released (there is no PR mechanism

on

the wiki) [7]


The vote will be open for at least 72 hours. It is adopted by

majority

approval, with at least 3 PMC affirmative votes.

Thanks,
Etienne

[1]



https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353442

[2]



https://dist.apache.org/repos/dist/dev/flink/flink-connector-parent-1.1.0-rc2

[3]https://dist.apache.org/repos/dist/release/flink/KEYS
[4]

https://repository.apache.org/content/repositories/orgapacheflink-1707

[5]



https://github.com/apache/flink-connector-shared-utils/releases/tag/v1.1.0-rc2

[6]https://github.com/apache/flink-web/pull/717

[7]



https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development




[ANNOUNCE] Apache flink-connector-jdbc 3.1.2 released

2024-02-22 Thread Sergey Nuyanzin
The Apache Flink community is very happy to announce the release of Apache
flink-connector-jdbc 3.1.2. This release is compatible with
Apache Flink 1.16, 1.17 and 1.18.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354088

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Release Manager


Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf Confluent Format

2024-02-22 Thread David Radley
Hi Kevin,
Some thoughts on this.
I suggested an Apicurio registry format in the dev list, and was advised to 
raise a FLIP for this, I suggest the same would apply here (or the alternative 
to FLIPs if you cannot raise one). I am prototyping an Avro Apicurio format, 
prior to raising the Flip,  and notice that the readSchema in the SchemaCoder 
only takes a byte array ,but I need to pass down the Kafka headers (where the 
Apicurio globalId identifying the schema lives).

I assume:

  *   for the confluent Protobuf format you would extend the Protobuf format to 
drive some Schema Registry logic for Protobuf (similar to the way Avro does it) 
where the magic byte _ schema id can be obtained and the schema looked up using 
the Confluent Schema registry.
  *   It would be good if any protobuf format enhancements for Schema 
registries pass down the Kafka headers (I am thinking as a Map 
for Avro) as well as the message payload so Apicurio registry could work with 
this.
  *   It would make sense to have the Confluent schema lookup in common code, 
which is part of the SchemaCoder readSchema  logic.
  *   I assume the ProtobufSchemaCoder readSchema would return a Protobuf 
Schema object.



I also wondered whether these Kafka only formats should be moved to the Kafka 
connector repo, or whether they might in the future be used outside Kafka – 
e.g. Avro/Protobuf files in a database.
   Kind regards, David.


From: Kevin Lam 
Date: Wednesday, 21 February 2024 at 18:51
To: dev@flink.apache.org 
Subject: [EXTERNAL] [DISCUSS] FLINK-34440 Support Debezium Protobuf Confluent 
Format
I would love to get some feedback from the community on this JIRA issue:
https://issues.apache.org/jira/projects/FLINK/issues/FLINK-34440

I am looking into creating a PR and would appreciate some review on the
approach.

In terms of design I think we can mirror the `debezium-avro-confluent` and
`avro-confluent` formats already available in Flink:

   1. `protobuf-confluent` format which uses DynamicMessage
   

   for encoding and decoding.
  - For encoding the Flink RowType will be used to dynamically create a
  Protobuf Schema and register it with the Confluent Schema
Registry. It will
  use the same schema to construct a DynamicMessage and serialize it.
  - For decoding, the schema will be fetched from the registry and use
  DynamicMessage to deserialize and convert the Protobuf object to a Flink
  RowData.
  - Note: here there is no external .proto file
   2. `debezium-avro-confluent` format which unpacks the Debezium Envelope
   and collects the appropriate UPDATE_BEFORE, UPDATE_AFTER, INSERT, DELETE
   events.
  - We may be able to refactor and reuse code from the existing
  DebeziumAvroDeserializationSchema + DebeziumAvroSerializationSchema since
  the deser logic is largely delegated to and these Schemas are concerned
  with the handling the Debezium envelope.
   3. Move the Confluent Schema Registry Client code to a separate maven
   module, flink-formats/flink-confluent-common, and extend it to support
   ProtobufSchemaProvider
   

   .


Does anyone have any feedback or objections to this approach?

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU


Re: Temporal join on rolling aggregate

2024-02-22 Thread Gyula Fóra
Posting this to dev as well as it potentially has some implications on
development effort.

What seems to be the problem here is that we cannot control/override
Timestamps/Watermarks/Primary key on VIEWs. It's understandable that you
cannot create a PRIMARY KEY on the view but I think the temporal join also
should not require the PK, should we remove this limitation?

The general problem is the inflexibility of the timestamp/watermark
handling on query outputs, which makes this again impossible.

The workaround here can be to write the rolling aggregate to Kafka, read it
back again and join with that. The fact that this workaround is possible
actually highlights the need for more flexibility on the query/view side in
my opinion.

Has anyone else run into this issue and considered the proper solution to
the problem? Feels like it must be pretty common :)

Cheers,
Gyula




On Wed, Feb 21, 2024 at 10:29 PM Sébastien Chevalley 
wrote:

> Hi,
>
> I have been trying to write a temporal join in SQL done on a rolling
> aggregate view. However it does not work and throws :
>
> org.apache.flink.table.api.ValidationException: Event-Time Temporal Table
> Join requires both primary key and row time attribute in versioned table,
> but no row time attribute can be found.
>
> It seems that after the aggregation, the table looses the watermark and
> it's not possible to add one with the SQL API as it's a view.
>
> CREATE TABLE orders (
> order_id INT,
> price DECIMAL(6, 2),
> currency_id INT,
> order_time AS NOW(),
> WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND
> )
> WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '10',
> 'fields.order_id.kind' = 'sequence',
> 'fields.order_id.start' = '1',
> 'fields.order_id.end' = '10',
> 'fields.currency_id.min' = '1',
> 'fields.currency_id.max' = '20'
> );
>
> CREATE TABLE currency_rates (
> currency_id INT,
> conversion_rate DECIMAL(4, 3),
> PRIMARY KEY (currency_id) NOT ENFORCED
> )
> WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '10',
> 'fields.currency_id.min' = '1',
> 'fields.currency_id.max' = '20'
> );
>
> CREATE TEMPORARY VIEW max_rates AS (
> SELECT
> currency_id,
> MAX(conversion_rate) AS max_rate
> FROM currency_rates
> GROUP BY currency_id
> );
>
> CREATE TEMPORARY VIEW temporal_join AS (
> SELECT
> order_id,
> max_rates.max_rate
> FROM orders
>  LEFT JOIN max_rates FOR SYSTEM_TIME AS OF orders.order_time
>  ON orders.currency_id = max_rates.currency_id
> );
>
> SELECT * FROM temporal_join;
>
> Am I missing something? What would be a good starting point to address
> this?
>
> Thanks in advance,
> Sébastien Chevalley


[jira] [Created] (FLINK-34499) Configuration#toString should hide sensitive values

2024-02-22 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-34499:


 Summary: Configuration#toString should hide sensitive values
 Key: FLINK-34499
 URL: https://issues.apache.org/jira/browse/FLINK-34499
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Configuration
Reporter: Chesnay Schepler
 Fix For: 1.20.0


Time and time again people log the entire Flink configuration for no reason, 
risking that sensitive values are logged in plain text.

We should make this harder by changing {{Configuration#toString}} to 
automatically hide sensitive values, for example like this:

{code}
@Override
public String toString() {
return ConfigurationUtils
.hideSensitiveValues(this.confData.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey, entry -> 
entry.getValue().toString(
.toString();
}
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34498) GSFileSystemFactory logs full Flink config

2024-02-22 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-34498:


 Summary: GSFileSystemFactory logs full Flink config
 Key: FLINK-34498
 URL: https://issues.apache.org/jira/browse/FLINK-34498
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.18.1
Reporter: Chesnay Schepler
 Fix For: 1.19.0, 1.18.2, 1.20.0


This can cause secrets from the config to be logged.
{code}
@Override
public void configure(Configuration flinkConfig) {
LOGGER.info("Configuring GSFileSystemFactory with Flink configuration 
{}", flinkConfig);
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] Apache Bahir retired

2024-02-22 Thread Ferenc Csaky
Hello devs,

Just saw that the Bahir project is retired [1]. Any plans on what's happening 
with the Flink connectors that were part of this project? We specifically use 
the Kudu connector and integrate it to our platform at Cloudera, so we would be 
okay to maintain it. Would it be possible to carry it over as separate 
connector repu under the Apache umbrella similarly as it happened with the 
external connectors previously?

Thanks,
Ferenc

[jira] [Created] (FLINK-34497) Avoid using system classloader in SerializedThrowableDeserializer

2024-02-22 Thread jrthe42 (Jira)
jrthe42 created FLINK-34497:
---

 Summary: Avoid using system classloader in 
SerializedThrowableDeserializer  
 Key: FLINK-34497
 URL: https://issues.apache.org/jira/browse/FLINK-34497
 Project: Flink
  Issue Type: Bug
  Components: Runtime / REST
Affects Versions: 1.18.1, 1.17.2
Reporter: jrthe42


SerializedThrowableDeserializer is now using 
`ClassLoader.getSystemClassLoader()` when deserializing `SerializedThrowable`. 
But when using flink-client in systems like spring boot, we will get exceptions 
like this:
{code:java}
java.lang.ClassNotFoundException: org.apache.flink.util.SerializedThrowable
at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:467)
at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at 
java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2034)
at 
java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1898)
at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2224)
at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:509)
at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:467)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527)
at 
org.apache.flink.runtime.rest.messages.json.SerializedThrowableDeserializer.deserialize(SerializedThrowableDeserializer.java:69)
at 
org.apache.flink.runtime.rest.messages.json.JobResultDeserializer.deserialize(JobResultDeserializer.java:106)
at 
org.apache.flink.runtime.rest.messages.json.JobResultDeserializer.deserialize(JobResultDeserializer.java:50)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:542)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:564)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:439)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1405)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:352)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:185)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:4706)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2948)
at 
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:635)
at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$6(RestClient.java:626)
at 
java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150)
at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
{code}
This is because Spring Boot is using [The Executable Jar 
Format,|https://docs.spring.io/spring-boot/docs/current/reference/html/executable-jar.html#appendix.executable-jar.restrictions]
 which contains Nested JARs, the system classloader is not able to load the 
class in the nested jar, thus will lead to class not found exception. We should 
use current context classloader instead.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34496) Classloading deadlock between ExecNodeMetadataUtil and JsonSerdeUtil

2024-02-22 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-34496:


 Summary: Classloading deadlock between ExecNodeMetadataUtil and 
JsonSerdeUtil
 Key: FLINK-34496
 URL: https://issues.apache.org/jira/browse/FLINK-34496
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.18.1
Reporter: Chesnay Schepler
 Fix For: 1.19.0, 1.18.2, 1.20.0


This is a fun one!

ExecNodeMetadataUtil and JsonSerdeUtil have a circular dependency in their 
static initialization, which can cause a classloading lockup when 2 threads are 
running the class initialization of each class at the same time because during 
class initialization they hold a lock.

{code}
Feb 22 00:31:58 "ForkJoinPool-3-worker-11" #25 daemon prio=5 os_prio=0 
cpu=219.87ms elapsed=995.99s tid=0x7ff11c50e000 nid=0xf0fc in Object.wait() 
 [0x7ff12a4f3000]
Feb 22 00:31:58java.lang.Thread.State: RUNNABLE
Feb 22 00:31:58 at 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.createFlinkTableJacksonModule(JsonSerdeUtil.java:133)
Feb 22 00:31:58 at 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.(JsonSerdeUtil.java:111)

Feb 22 00:31:58 "ForkJoinPool-3-worker-7" #23 daemon prio=5 os_prio=0 
cpu=54.83ms elapsed=996.00s tid=0x7ff11c50c000 nid=0xf0fb in Object.wait()  
[0x7ff12a5f4000]
Feb 22 00:31:58java.lang.Thread.State: RUNNABLE
Feb 22 00:31:58 at 
org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil.addToLookupMap(ExecNodeMetadataUtil.java:235)
Feb 22 00:31:58 at 
org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil.(ExecNodeMetadataUtil.java:156)
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] New Apache Flink Committer - Jiabao Sun

2024-02-22 Thread Weihua Hu
Congratulations, Jiabao!

Best,
Weihua


On Thu, Feb 22, 2024 at 10:34 AM Jingsong Li  wrote:

> Congratulations! Well deserved!
>
> On Wed, Feb 21, 2024 at 4:36 PM Yuepeng Pan  wrote:
> >
> > Congratulations~ :)
> >
> > Best,
> > Yuepeng Pan
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2024-02-21 09:52:17,"Hongshun Wang"  写道:
> > >Congratulations, Jiabao :)
> > >Congratulations Jiabao!
> > >
> > >Best,
> > >Hongshun
> > >Best regards,
> > >
> > >Weijie
> > >
> > >On Tue, Feb 20, 2024 at 2:19 PM Runkang He  wrote:
> > >
> > >> Congratulations Jiabao!
> > >>
> > >> Best,
> > >> Runkang He
> > >>
> > >> Jane Chan  于2024年2月20日周二 14:18写道:
> > >>
> > >> > Congrats, Jiabao!
> > >> >
> > >> > Best,
> > >> > Jane
> > >> >
> > >> > On Tue, Feb 20, 2024 at 10:32 AM Paul Lam 
> wrote:
> > >> >
> > >> > > Congrats, Jiabao!
> > >> > >
> > >> > > Best,
> > >> > > Paul Lam
> > >> > >
> > >> > > > 2024年2月20日 10:29,Zakelly Lan  写道:
> > >> > > >
> > >> > > >> Congrats! Jiabao!
> > >> > >
> > >> > >
> > >> >
> > >>
>