Re: [DISCUSS] FLIP-546: Introduce CREATE OR ALTER for Materialized Tables

2025-09-07 Thread Ramin Gharib
Hi Ron,

Thanks again for the excellent feedback and for bringing FLIP-492 [1] into
the discussion. I want to address your points on CREATE OR REPLACE vs. CREATE
OR ALTER and the SQL standard.

*1. Semantics of REPLACE vs. ALTER*

You’re right that FLIP-492 [1] proposed CREATE OR REPLACE. However, I
believe the semantics of ALTER are a much better fit for an object as
complex as a Materialized Table, which has a running job, state, and
physical data associated with it.

Interestingly, this exact distinction is recognized by other modern data
platforms. For example, *Snowflake supports both CREATE OR REPLACE TABLE
and CREATE OR ALTER TABLE as distinct commands with different use cases
[2].*

   -

   Their documentation for CREATE OR ALTER TABLE explicitly states it *"creates
   a table if it doesn’t exist, or alters it according to the table
   definition... existing data in the table is preserved when possible."*
   This "modify in place" semantic is precisely what this FLIP aims to achieve.
   -

   Conversely, their docs describe CREATE OR REPLACE as *"the equivalent of
   using DROP TABLE on the existing table and then creating a new table..."*
   This "drop and recreate" semantics is a destructive action that is not
   suitable for a stateful object like a Flink Materialized Table, where the
   goal is to evolve the pipeline, not destroy it.

This is particularly relevant for Flink, as we already have an ALTER
MATERIALIZED TABLE ... AS  command that defines the
"modify in-place" behavior. My proposal is that the ALTER path of CREATE OR
ALTER should be implemented by delegating directly to this existing logic.
This ensures behavioral consistency. Using REPLACE would introduce a
conflict that doesn't align with our current implementation.

*2. Forward-Looking Resilience*

The ALTER semantic is also more forward-looking. As Flink's evolution
capabilities become more sophisticated—potentially including complex
reprocessing strategies that retain historical data—the concept of
'altering' a pipeline is more fitting than 'replacing' it. REPLACE suggests
a simple, destructive action, whereas ALTER provides a more resilient
foundation for nuanced, state-preserving modifications in the future.

*3. The SQL Standard*

You've raised an important point about the SQL standard. A review shows
that neither CREATE OR REPLACE nor CREATE OR ALTER are part of the formal
ANSI/ISO SQL standard*.* They are both widely adopted, vendor-specific
extensions.

   -

   *CREATE OR REPLACE* was popularized by systems like PostgreSQL (for
   views/functions) [3] and Oracle [4].
   -

   *CREATE OR ALTER* was popularized by Microsoft SQL Server [5] and is now
   a key feature in Snowflake [6].

Since neither is formally "standard," our choice should be guided by which
one provides the most clarity and consistency for Flink users. The fact
that a major platform like Snowflake has implemented both as distinct
commands reinforces the idea that the industry sees value in their
different semantics.

*4. Preventing User Confusion*

I believe CREATE OR ALTER will actually be *less confusing* for Flink users
precisely because it maps directly to the existing commands they already
know. A user familiar with ALTER MATERIALIZED TABLE ... AS will immediately
understand what the ALTER part of the new command does.

Given that the implementation for FLIP-492 [1] is not yet complete (
FLINK-36995 [7] is still open), now is a good time to choose (or define)
the syntax with the clearest and safest semantics before any code is merged.

In summary, I believe CREATE OR ALTER provides clearer, non-destructive
semantics that are more consistent with Flink's existing DDL for
Materialized Tables, making it the superior choice for both current
functionality and future resilience.

Thanks again for the great discussion points. I look forward to hearing
your thoughts and those of the wider community.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-492%3A+Support+Query+Modifications+for+Materialized+Tables
[2] https://docs.snowflake.com/en/sql-reference/sql/create-table#usage-notes
[3] https://www.postgresql.org/docs/current/sql-createview.html
[4] https://docs.oracle.com/cd/E17952_01/mysql-5.7-en/create-view.html
[5]
https://learn.microsoft.com/en-us/sql/t-sql/statements/create-view-transact-sql?view=sql-server-ver17#or-alter
[6]
https://docs.snowflake.com/en/sql-reference/sql/create-table#label-create-or-alter-table-syntax
[7] https://issues.apache.org/jira/browse/FLINK-36995

Best,

Ramin

On Thu, Sep 4, 2025 at 1:54 PM Ron Liu  wrote:

> Hi, Ramin
>
> In FLIP-492[1], we introduced the `CREATE OR REPLACE MATERIALIZED TABLE`
> syntax to support modifying materialized tables. Can we extend this syntax
> to achieve the functionality you need, such as introducing clause
> parameters to determine whether to replace the entire table or just some of
> its attributes?
>
> Regarding the `CREATE OR ALTER TABLE` syntax, I couldn't find it in the SQL
> stan

[jira] [Created] (FLINK-38327) NPE during recovery from file-merged checkpoint after FO

2025-09-07 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-38327:
---

 Summary: NPE during recovery from file-merged checkpoint after FO
 Key: FLINK-38327
 URL: https://issues.apache.org/jira/browse/FLINK-38327
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.20.2, 2.0.0, 2.1.0
Reporter: Zakelly Lan
Assignee: Zakelly Lan


Report: https://lists.apache.org/thread/yzcxqdfsfdykgzdfkovf65jbwy4j6g0y

This job is running on Kubernetes using the Apache Flink Kubernetes operator. 
This NullPointerException happened during a job restart after one of the 
TaskManagers restarted because the underlying node running the TaskManager pod 
was scaled down for maintenance. There was no rescaling or parallelism change.

The job is quite large due to heavy input traffic + state size:
2100 parallelism
taskmanager.numberOfTaskSlots: 14 (so 150 TaskManagers total)
RocksDBStateBackend used for state management. Checkpoints/savepoints are 
written to S3 in AWS.
According to the Flink Checkpoint UI, the full state size is ~600GB


{code:java}
execution.checkpointing.file-merging.enabled: true
execution.checkpointing.file-merging.max-file-size: 32m
execution.checkpointing.timeout: "10min"
execution.checkpointing.tolerable-failed-checkpoints: 3
execution.checkpointing.min-pause: "2min"
execution.checkpointing.interval: "2min"
{code}

{code:java}
java.lang.NullPointerException
at 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.isManagedByFileMergingManager(FileMergingSnapshotManagerBase.java:927)
at 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$11(FileMergingSnapshotManagerBase.java:866)
at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1220)
at 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$12(FileMergingSnapshotManagerBase.java:861)
at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at 
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at 
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at 
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at 
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
at 
java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
at 
java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992)
at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at 
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
at 
java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
at java.base/java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:1003)
at 
java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
at 
java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734)
at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at 
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
at 
java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipelin

[jira] [Created] (FLINK-38329) Add labels to PRs showing the targeted version

2025-09-07 Thread david radley (Jira)
david radley created FLINK-38329:


 Summary: Add labels to PRs showing the targeted version  
 Key: FLINK-38329
 URL: https://issues.apache.org/jira/browse/FLINK-38329
 Project: Flink
  Issue Type: Improvement
Reporter: david radley


Amend the community review git action script to put the targeted branch of the 
PR as a label. In this way it will be easy to distinguish between backport PRs 
with the same title.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-38318) Add python autoformatter to pyflink

2025-09-07 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-38318:
---

 Summary: Add python autoformatter to pyflink 
 Key: FLINK-38318
 URL: https://issues.apache.org/jira/browse/FLINK-38318
 Project: Flink
  Issue Type: Bug
Reporter: Sergey Nuyanzin


Currently we have spotless 

however there is also python code and it have some validation for style leading 
to failures, like 

{noformat}
Sep 03 08:11:52 ./pyflink/table/catalog.py:163:101: E501 line too long (107 > 
100 characters)
Sep 03 08:11:52 ./pyflink/table/table_environment.py:624:101: E501 line too 
long (131 > 100 characters)
Sep 03 08:11:52 ./pyflink/table/table_environment.py:626:101: E501 line too 
long (122 > 100 characters)
{noformat}
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=69581&view=l[…]da-0f718fb86602&t=0a887b12-e4d0-528d-2191-73222db51fda&l=20115

And these kind of errors require manual resolution

For now I see 2 possible improvements
# Add python autoformatter and allow to use it similar to {{./mvnw 
spotless:apply}}
# Run style validation before all the tests as for now we have to wait half an 
hour or more for tests and after that the validation might fail like in CI 
example above



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-537: Enumerator with Global Split Assignment Distribution for Balanced Split assignment

2025-09-07 Thread Leonard Xu
Thanks Hongshun and Becket for the deep discussion. 

I only have one comment for one API design:

> Deprecate the old addSplitsBack  method, use a addSplitsBack with param 
> isReportedByReader instead. Because, The enumerator can apply different 
> reassignment policies based on the context.

Could we introduce a new method like addSplitsBackOnRecovery  with default 
implementation. In this way, we can provide better backward compatibility and 
also makes it easier for developers to understand.

Best,
Leonard



> 2025 9月 3 20:26,Hongshun Wang  写道:
> 
> Hi Becket,
> 
> I think that's a great idea!  I have added the 
> SupportSplitReassignmentOnRecovery interface in this FLIP. If a Source 
> implements this interface indicates that the source operator needs to report 
> splits to the enumerator and receive reassignment.[1]
> 
> Best,
> Hongshun
> 
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
> 
> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin  > wrote:
>> Hi Hongshun,
>> 
>> I think the convention for such optional features in Source is via mix-in 
>> interfaces. So instead of adding a method to the SourceReader, maybe we 
>> should introduce an interface SupportSplitReassingmentOnRecovery with this 
>> method. If a Source implementation implements that interface, then the 
>> SourceOperator will check the desired behavior and act accordingly.
>> 
>> Thanks,
>> 
>> Jiangjie (Becket) Qin
>> 
>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang > > wrote:
>>> Hi de vs,
>>> 
>>> Would anyone like to discuss this FLIP? I'd appreciate your feedback and 
>>> suggestions.
>>> 
>>> Best,
>>> Hongshun
>>> 
>>> 
 2025年8月13日 14:23,Hongshun Wang >>> > 写道:
 
 Hi Becket,
 Thank you for your detailed feedback. The new contract makes good sense to 
 me and effectively addresses the issues I encountered at the beginning of 
 the design.
 That said, I recommend not reporting splits by default, primarily for 
 compatibility and practical reasons:
 >  For these reasons, we do not expect the Split objects to be huge, and 
 > we are not trying to design for huge Split objects either as they will 
 > have problems even today.
 Not all existing connector match this rule
 For example, in mysql cdc connector, a binlog split may contain hundreds 
 (or even more) snapshot split completion records. This state is large and 
 is currently transmitted incrementally through multiple 
 BinlogSplitMetaEvent messages. Since the binlog reader operates with 
 single parallelism, reporting the full split state on recovery could be 
 inefficient or even infeasible.
 For such sources, it would be better to provide a mechanism to skip split 
 reporting during restart until they redesign and reduce the split size.
 Not all enumerators maintain unassigned splits in state.
 Some SplitEnumerator(such as kafka connector) implementations do not track 
 or persistently manage unassigned splits. Requiring them to handle 
 re-registration would add unnecessary complexity. Even though we maybe 
 implements in kafka connector, currently, kafka connector is decouple with 
 flink version, we also need to make sure the elder version is compatible.
 To address these concerns, I propose introducing a new method: boolean 
 SourceReader#shouldReassignSplitsOnRecovery() with a default 
 implementation returning false. This allows source readers to opt in to 
 split reassignment only when necessary. Since the new contract already 
 places the responsibility for split assignment on the enumerator, not 
 reporting splits by default is a safe and clean default behavior.
 
 
 I’ve updated the implementation and the FIP accordingly[1]. It quite a big 
 change. In particular, for the Kafka connector, we can now use a pluggable 
 SplitPartitioner to support different split assignment strategies (e.g., 
 default, round-robin).
 
 Could you please review it when you have a chance?
 
 Best,
 Hongshun
 
 [1] 
 https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
 
 On Sat, Aug 9, 2025 at 3:03 AM Becket Qin >>> > wrote:
> Hi Hongshun,
> 
> I am not too concerned about the transmission cost. Because the full 
> split transmission has to happen in the initial assignment phase already. 
> And in the future, we probably want to also introduce some kind of 
> workload balance across source readers, e.g. based on the per-split 
> throughput or the per-source-reader workload in heterogeneous clusters. 
> For these reasons, we do not expect the Split objec

Subject: Feature Request: Support for Resetting Progress of a Specific Table in YAML-based CDC

2025-09-07 Thread L W
Dear Flink CDC Community,
My name is LW and I am a user of Flink CDC. I am currently using Flink CDC with 
a YAML configuration file to synchronize data from MySQL to Apache Paimon, and 
I am very impressed with its capabilities.
I am writing to propose a new feature that I believe would be a valuable 
addition. Currently, when synchronizing multiple tables defined in a single 
YAML file, it appears there is no way to reset the synchronization progress for 
a single, specific table without affecting the progress of the other tables.
For instance, if one of the target tables in Paimon needs to be re-initialized 
due to data corruption or business requirements, the current approach would 
require restarting the entire pipeline. This would cause all other tables to 
resynchronize from their beginning, which is inefficient and can be disruptive 
for the overall data flow.
Therefore, I would like to request the addition of a new feature that allows 
users to reset the progress for a specified table (or a selection of tables) 
and have it re-read from the beginning, while the synchronization for all other 
tables in the same job continues unaffected from their last recorded checkpoint.
This could potentially be implemented via a command-line interface, a REST API 
call to the running job, or through a dynamic configuration update.
I believe this feature would greatly enhance the flexibility and manageability 
of Flink CDC in production environments where targeted data reprocessing is 
often necessary.
Thank you for your time and for developing this great tool. I look forward to 
hearing your thoughts on this proposal.
Best regards,
LW