[jira] [Created] (FLINK-35177) Datagen examples in documentation do not compile

2024-04-19 Thread Sergei Morozov (Jira)
Sergei Morozov created FLINK-35177:
--

 Summary: Datagen examples in documentation do not compile
 Key: FLINK-35177
 URL: https://issues.apache.org/jira/browse/FLINK-35177
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.19.0
Reporter: Sergei Morozov


Currently, the examples look like this:

{code}
GeneratorFunction generatorFunction = index -> index;
double recordsPerSecond = 100;

DataGeneratorSource source =
new DataGeneratorSource<>(
 generatorFunction,
 Long.MAX_VALUE,
 RateLimiterStrategy.perSecond(recordsPerSecond),
 Types.STRING);
{code}

The generator function returns Long but the DataGeneratorSource uses String, so 
their types do not match.

Either the generator function needs to be changed to return a string, or the 
source needs to use Long.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-446: Kubernetes Operator State Snapshot CRD

2024-04-19 Thread Gyula Fóra
Hey!

Regarding the question of initialSavepointPath and
flinkStateSnapshotReference new object, I think we could simply keep an
extra field as part of the flinkStateSnapshotReference object called path.

Then the fields could be:
namespace, name, path

If path is defined we would use that (to support the simple way also)
otherwise use the resource. I would still deprecate the
initialSavepointPath field in the jobSpec.

Regarding the Savepoint/Checkpoint vs FlinkStateSnapshot.
What we need:
 1. Easy way to list all state snapshots (to select latest)
 2. Easy way to reference a savepoint/checkpoint from a jobspec
 3. Differentiate state snapshot types (in some cases users may prefer to
use checkpoint/savepoint for certain upgrades) -> we should add a label or
something for easy selection
 4. Be able to delete savepoints (and checkpoints maybe)

I am personally slightly more in favor of having a single resource as that
ticks all the boxes, while having 2 separate resources will make both
listing and referencing harder. We would have to introduce state type in
the reference (name + namespace would not be enough to uniquely identify a
state snapshot)

I wonder if I am missing any good argument against the single
FlinkStateSnapshot here.

Cheers,
Gyula


On Fri, Apr 19, 2024 at 9:09 PM Mate Czagany  wrote:

> Hi Robert and Thomas,
>
> Thank you for sharing your thoughts, I will try to address your questions
> and suggestions:
>
> 1. I would really love to hear others' inputs as well about separating the
> snapshot CRD into two different CRDs instead for savepoints and
> checkpoints. I think the main upside is that we would not need the
> mandatory savepoint or checkpoint field inside the spec. The two CRs could
> share the same status fields, and their specs would be different.
> I personally like both solutions, and would love to hear others' thoughts
> as well.
>
> 2. I agree with you that "completed" is not very clear, but I would
> suggest the name "alreadyExists". WDYT?
>
> 3. I think having a retry loop inside the operator does not add too much
> complexity to the FLIP. On failure, we check if we have reached the max
> retries. If we did, the state will be set to "FAILED", else it will be set
> to "TRIGGER_PENDING", causing the operator to retry the task. The "error"
> field will always be populated with the latest error. Kubernetes Jobs
> already has a similar field called "backoffLimit", maybe we could use the
> same name, with the same logic applied, WDYT?
> About error events, I think we should keep the "error" field, and upon
> successful snapshot, we clear it. I will add to the FLIP that there will be
> an event generated for each unsuccessful snapshots.
>
> 4. I really like the idea of having something like Pod Conditions, but I
> think it wouldn't add too much value here, because the only 2 stages
> important to the user are "Triggered" and "Completed", and those timestamps
> will already be included in the status field. I think it would make more
> sense to implement this if there were more lifecycle stages.
>
> 5. There will be a new field in JobSpec called
> "flinkStateSnapshotReference" to reference a FlinkStateSnapshot to restore
> from.
>
> > How do you see potential effects on API server performance wrt. number of
> objects vs mutations? Is the proposal more or less neutral in that regard?
>
> While I am not an expert in Kubernetes internals, my understanding is that
> for the api-server, editing an existing resource or creating a new one is
> not different performance-wise, because the whole resource will always be
> written to etcd anyways.
> Retrieving the savepoints from etcd will be different though for some
> use-cases, e.g. retrieving all snapshots for a specific FlinkDeployment
> would require the api-server to retrieve every snapshots first in a
> namespace from etcd, then filter them for that specific FlinkDeployment. I
> think this is a worst-case scenario, and it will be up to the user to
> optimize their queries via e.g. watch queries [1] or resourceVersions [2].
>
> > Does that mean one would have to create a FlinkStateSnapshot CR when
> starting a new deployment from savepoint? If so, that's rather complicated.
> I would prefer something more simple/concise and would rather
> keep initialSavepointPath
>
> Starting a job from a savepoint path will indeed be deprecated with this
> FLIP. I agree that it will be more complicated to restore from a savepoint
> in those cases, but if the user decides to move away from the deprecated
> savepoint mechanisms, every savepoint will result in a new
> FlinkStateSnapshot CR. So the only situation I expect this to be an
> inconvenience is when the user onboards a new Flink job to the operator.
> But I may not be thinking this through, so please let me know if you
> disagree.
>
> Thank you very much for your questions and suggestions!
>
> [1]
> https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
> [2]

Re: [DISCUSS] FLIP-446: Kubernetes Operator State Snapshot CRD

2024-04-19 Thread Mate Czagany
Hi Robert and Thomas,

Thank you for sharing your thoughts, I will try to address your questions
and suggestions:

1. I would really love to hear others' inputs as well about separating the
snapshot CRD into two different CRDs instead for savepoints and
checkpoints. I think the main upside is that we would not need the
mandatory savepoint or checkpoint field inside the spec. The two CRs could
share the same status fields, and their specs would be different.
I personally like both solutions, and would love to hear others' thoughts
as well.

2. I agree with you that "completed" is not very clear, but I would suggest
the name "alreadyExists". WDYT?

3. I think having a retry loop inside the operator does not add too much
complexity to the FLIP. On failure, we check if we have reached the max
retries. If we did, the state will be set to "FAILED", else it will be set
to "TRIGGER_PENDING", causing the operator to retry the task. The "error"
field will always be populated with the latest error. Kubernetes Jobs
already has a similar field called "backoffLimit", maybe we could use the
same name, with the same logic applied, WDYT?
About error events, I think we should keep the "error" field, and upon
successful snapshot, we clear it. I will add to the FLIP that there will be
an event generated for each unsuccessful snapshots.

4. I really like the idea of having something like Pod Conditions, but I
think it wouldn't add too much value here, because the only 2 stages
important to the user are "Triggered" and "Completed", and those timestamps
will already be included in the status field. I think it would make more
sense to implement this if there were more lifecycle stages.

5. There will be a new field in JobSpec called
"flinkStateSnapshotReference" to reference a FlinkStateSnapshot to restore
from.

> How do you see potential effects on API server performance wrt. number of
objects vs mutations? Is the proposal more or less neutral in that regard?

While I am not an expert in Kubernetes internals, my understanding is that
for the api-server, editing an existing resource or creating a new one is
not different performance-wise, because the whole resource will always be
written to etcd anyways.
Retrieving the savepoints from etcd will be different though for some
use-cases, e.g. retrieving all snapshots for a specific FlinkDeployment
would require the api-server to retrieve every snapshots first in a
namespace from etcd, then filter them for that specific FlinkDeployment. I
think this is a worst-case scenario, and it will be up to the user to
optimize their queries via e.g. watch queries [1] or resourceVersions [2].

> Does that mean one would have to create a FlinkStateSnapshot CR when
starting a new deployment from savepoint? If so, that's rather complicated.
I would prefer something more simple/concise and would rather
keep initialSavepointPath

Starting a job from a savepoint path will indeed be deprecated with this
FLIP. I agree that it will be more complicated to restore from a savepoint
in those cases, but if the user decides to move away from the deprecated
savepoint mechanisms, every savepoint will result in a new
FlinkStateSnapshot CR. So the only situation I expect this to be an
inconvenience is when the user onboards a new Flink job to the operator.
But I may not be thinking this through, so please let me know if you
disagree.

Thank you very much for your questions and suggestions!

[1]
https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
[2]
https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions

Regards,
Mate

Thomas Weise  ezt írta (időpont: 2024. ápr. 19., P, 11:31):

> Thanks for the proposal.
>
> How do you see potential effects on API server performance wrt. number of
> objects vs mutations? Is the proposal more or less neutral in that regard?
>
> Thanks for the thorough feedback Robert.
>
> Couple more questions below.
>
> -->
>
> On Fri, Apr 19, 2024 at 5:07 AM Robert Metzger 
> wrote:
>
> > Hi Mate,
> > thanks for proposing this, I'm really excited about your FLIP. I hope my
> > questions make sense to you:
> >
> > 1. I would like to discuss the "FlinkStateSnapshot" name and the fact
> that
> > users have to use either the savepoint or checkpoint spec inside the
> > FlinkStateSnapshot.
> > Wouldn't it be more intuitive to introduce two CRs:
> > FlinkSavepoint and FlinkCheckpoint
> > Ideally they can internally share a lot of code paths, but from a users
> > perspective, the abstraction is much clearer.
> >
>
> There are probably pros and cons either way. For example it is desirable to
> have a single list of state snapshots when looking for the initial
> savepoint for a new deployment etc.
>
>
> >
> > 2. I also would like to discuss SavepointSpec.completed, as this name is
> > not intuitive to me. How about "ignoreExisting"?
> >
> > 3. The FLIP proposal seems to leave error handling to the user, e.g. when
> > you create a 

[jira] [Created] (FLINK-35176) Support property authentication connection for JDBC catalog & dynamic table

2024-04-19 Thread RocMarshal (Jira)
RocMarshal created FLINK-35176:
--

 Summary: Support property authentication connection for JDBC 
catalog & dynamic table
 Key: FLINK-35176
 URL: https://issues.apache.org/jira/browse/FLINK-35176
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC
Reporter: RocMarshal






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-435: Introduce a New Materialized Table for Simplifying Data Pipelines

2024-04-19 Thread Martijn Visser
+1 (binding)

On Fri, Apr 19, 2024 at 10:07 AM Yuepeng Pan  wrote:

> +1(non-binding)
>
> Best,
> Yuepeng Pan
>
> At 2024-04-19 15:22:04, "gongzhongqiang" 
> wrote:
> >+1(non-binding)
> >
> >
> >Best,
> >
> >Zhongqiang Gong
> >
> >Ron liu  于2024年4月17日周三 14:28写道:
> >
> >> Hi Dev,
> >>
> >> Thank you to everyone for the feedback on FLIP-435: Introduce a New
> >> Materialized Table for Simplifying Data Pipelines[1][2].
> >>
> >> I'd like to start a vote for it. The vote will be open for at least 72
> >> hours unless there is an objection or not enough votes.
> >>
> >> [1]
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-435%3A+Introduce+a+New+Materialized+Table+for+Simplifying+Data+Pipelines
> >> [2] https://lists.apache.org/thread/c1gnn3bvbfs8v1trlf975t327s4rsffs
> >>
> >> Best,
> >> Ron
> >>
>


[jira] [Created] (FLINK-35175) HadoopDataInputStream can't compile with Hadoop 3.2.3

2024-04-19 Thread Ryan Skraba (Jira)
Ryan Skraba created FLINK-35175:
---

 Summary: HadoopDataInputStream can't compile with Hadoop 3.2.3
 Key: FLINK-35175
 URL: https://issues.apache.org/jira/browse/FLINK-35175
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.20.0
Reporter: Ryan Skraba


Unfortunately, introduced in FLINK-35045: 
[PREADWRITEBUFFER|https://github.com/apache/flink/commit/a312a3bdd258e0ff7d6f94e979b32e2bc762b82f#diff-3ed57be01895ba0f792110e40f4283427c55528f11a5105b4bf34ebd4e6fef0dR182]
 was added in Hadoop releases 
[3.3.0|https://github.com/apache/hadoop/blob/rel/release-3.3.0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java#L72]
 and 
[2.10.0|https://github.com/apache/hadoop/blob/rel/release-2.10.0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java#L72].

It doesn't exist in flink.hadoop.version 
[3.2.3|https://github.com/apache/hadoop/blob/rel/release-3.2.3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java],
 which we are using in end-to-end tests.
{code:java}
00:23:55.093 [ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile (default-compile) 
on project flink-hadoop-fs: Compilation failure: Compilation failure: 
00:23:55.093 [ERROR] 
/home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[151,63]
 cannot find symbol
00:23:55.094 [ERROR]   symbol:   variable READBYTEBUFFER
00:23:55.094 [ERROR]   location: interface 
org.apache.hadoop.fs.StreamCapabilities
00:23:55.094 [ERROR] 
/home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[182,63]
 cannot find symbol
00:23:55.094 [ERROR]   symbol:   variable PREADBYTEBUFFER
00:23:55.094 [ERROR]   location: interface 
org.apache.hadoop.fs.StreamCapabilities
00:23:55.094 [ERROR] 
/home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[183,43]
 incompatible types: long cannot be converted to 
org.apache.hadoop.io.ByteBufferPool
00:23:55.094 [ERROR] -> [Help 1] {code}
* 1.20 compile_cron_hadoop313 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59012=logs=87489130-75dc-54e4-1f45-80c30aa367a3=73da6d75-f30d-5d5a-acbe-487a9dcff678=3630



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35174) Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.1 for Flink RabbitMQ connector

2024-04-19 Thread Danny Cranmer (Jira)
Danny Cranmer created FLINK-35174:
-

 Summary: Bump org.apache.commons:commons-compress from 1.25.0 to 
1.26.1 for Flink RabbitMQ connector
 Key: FLINK-35174
 URL: https://issues.apache.org/jira/browse/FLINK-35174
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / AWS
Reporter: Danny Cranmer
Assignee: Danny Cranmer
 Fix For: aws-connector-4.3.0


Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.1 for Flink AWS 
connectors



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[VOTE] Release flink-connector-aws v4.3.0, release candidate #2

2024-04-19 Thread Danny Cranmer
Hi everyone,

Please review and vote on release candidate #2 for flink-connector-aws
v4.3.0, as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

This version supports Flink 1.18 and 1.19.

The complete staging area is available for your review, which includes:
* JIRA release notes [1],
* the official Apache source release to be deployed to dist.apache.org [2],
which are signed with the key with fingerprint 125FD8DB [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag v4.3.0-rc2 [5],
* website pull request listing the new release [6].
* CI build of the tag [7].

The vote will be open for at least 72 hours. It is adopted by majority
approval, with at least 3 PMC affirmative votes.

Thanks,
Release Manager

[1]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353793
[2]
https://dist.apache.org/repos/dist/dev/flink/flink-connector-aws-4.3.0-rc2
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4] https://repository.apache.org/content/repositories/orgapacheflink-1721/
[5] https://github.com/apache/flink-connector-aws/releases/tag/v4.3.0-rc2
[6] https://github.com/apache/flink-web/pull/733
[7] https://github.com/apache/flink-connector-aws/actions/runs/8751694197


[jira] [Created] (FLINK-35173) Debezium Custom Time Serializer

2024-04-19 Thread ZhengYu Chen (Jira)
ZhengYu Chen created FLINK-35173:


 Summary: Debezium Custom Time Serializer 
 Key: FLINK-35173
 URL: https://issues.apache.org/jira/browse/FLINK-35173
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: 3.1.0
Reporter: ZhengYu Chen
 Fix For: 3.1.0


Currently, Flink CDC Time encounters time type errors (including DateTime, 
Time, Date, TimeStamp) when using MySQL Connector 
(JsonDebeziumDeserializationSchema) as deserialization, and the converted time 
is wrong. The essential reason is that the timestamp returned by the bottom 
layer of debezium is UTC (such as io.debezium.time.Timestamp). The community 
has already had some 
[PR|https://github.com/apache/flink-cdc/pull/1366/files#diff-e129e9fae3eea0bb32f0019debb4932413c91088d6dae656e2ecb63913badae4],
 but they are not work.

Now a way is provided to provide a solution based on Debezium's custom Convert 
interface 
(https://debezium.io/documentation/reference/1.9/development/converters.html),
Users can choose to convert the above four time types into STRING according to 
the specified time format to ensure that users can correctly convert JSON when 
using the Flink DataStream API.


When the user enables this converter, we need to configure it according to the 
parameters, That's some datastream use case:
{code:java}
Properties debeziumProperties = new Properties();
debeziumProperties.setProperty("converters", "datetime");
debeziumProperties.setProperty("datetime.database.type", 
DataBaseType.MYSQL.getType());
debeziumProperties.setProperty("datetime.type", 
"cn.xxx.sources.cdc.MysqlDebeziumConverter");
debeziumProperties.setProperty("datetime.format.date", "-MM-dd");
debeziumProperties.setProperty("datetime.format.time", "HH:mm:ss");
debeziumProperties.setProperty("datetime.format.datetime", "-MM-dd 
HH:mm:ss");
debeziumProperties.setProperty("datetime.format.timestamp", "-MM-dd 
HH:mm:ss");
debeziumProperties.setProperty("datetime.format.timestamp.zone", "UTC+8");
MySqlSourceBuilder builder = MySqlSource.builder()
        .hostname(url[0])
        .port(Integer.parseInt(url[1]))
        .databaseList(table.getDatabase())
        .tableList(getTablePattern(table))
        .username(table.getUserName())
        .password(table.getPassword())
        .debeziumProperties(debeziumProperties); {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-446: Kubernetes Operator State Snapshot CRD

2024-04-19 Thread Thomas Weise
Thanks for the proposal.

How do you see potential effects on API server performance wrt. number of
objects vs mutations? Is the proposal more or less neutral in that regard?

Thanks for the thorough feedback Robert.

Couple more questions below.

-->

On Fri, Apr 19, 2024 at 5:07 AM Robert Metzger  wrote:

> Hi Mate,
> thanks for proposing this, I'm really excited about your FLIP. I hope my
> questions make sense to you:
>
> 1. I would like to discuss the "FlinkStateSnapshot" name and the fact that
> users have to use either the savepoint or checkpoint spec inside the
> FlinkStateSnapshot.
> Wouldn't it be more intuitive to introduce two CRs:
> FlinkSavepoint and FlinkCheckpoint
> Ideally they can internally share a lot of code paths, but from a users
> perspective, the abstraction is much clearer.
>

There are probably pros and cons either way. For example it is desirable to
have a single list of state snapshots when looking for the initial
savepoint for a new deployment etc.


>
> 2. I also would like to discuss SavepointSpec.completed, as this name is
> not intuitive to me. How about "ignoreExisting"?
>
> 3. The FLIP proposal seems to leave error handling to the user, e.g. when
> you create a FlinkStateSnapshot, it will just move to status FAILED.
> Typically in K8s with the control loop stuff, resources are tried to get
> created until success. I think it would be really nice if the
> FlinkStateSnapshot or FlinkSavepoint resource would retry based on a
> property in the resource. A "FlinkStateSnapshot.retries" number would
> indicate how often the user wants the operator to retry creating a
> savepoint, "retries = -1" means retry forever. In addition, we could
> consider a timeout as well, however, I haven't seen such a concept in K8s
> CRs yet.
> The benefit of this is that other tools relying on the K8s operator
> wouldn't have to implement this retry loop (which is quite natural for
> K8s), they would just have to wait for the CR they've created to transition
> into COMPLETED:
>
> 3. FlinkStateSnapshotStatus.error will only show the last error. What
> about using Events, so that we can show multiple errors and use the
> FlinkStateSnapshotState to report errors?
>
> 4. I wonder if it makes sense to use something like Pod Conditions (e.g.
> Savepoint Conditions):
> https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-conditions
> to track the completion status. We could have the following conditions:
> - Triggered
> - Completed
> - Failed
> The only benefit of this proposal that I see is that it would tell the
> user how long it took to create the savepoint.
>
> 5. You mention that "JobSpec.initialSavepointPath" will be deprecated. I
> assume we will introduce a new field for referencing a FlinkStateSnapshot
> CR? I think it would be good to cover this in the FLIP.
>
> Does that mean one would have to create a FlinkStateSnapshot CR when
starting a new deployment from savepoint? If so, that's rather complicated.
I would prefer something more simple/concise and would rather
keep initialSavepointPath


>
> One minor comment:
>
> "/** Dispose the savepoints upon CRD deletion. */"
>
> I think this should be "upon CR deletion", not "CRD deletion".
>
> Thanks again for this great FLIP!
>
> Best,
> Robert
>
>
> On Fri, Apr 19, 2024 at 9:01 AM Gyula Fóra  wrote:
>
>> Cc'ing some folks who gave positive feedback on this idea in the past.
>>
>> I would love to hear your thoughts on the proposed design
>>
>> Gyula
>>
>> On Tue, Apr 16, 2024 at 6:31 PM Őrhidi Mátyás 
>> wrote:
>>
>>> +1 Looking forward to it
>>>
>>> On Tue, Apr 16, 2024 at 8:56 AM Mate Czagany  wrote:
>>>
>>> > Thank you Gyula!
>>> >
>>> > I think that is a great idea. I have updated the Google doc to only
>>> have 1
>>> > new configuration option of boolean type, which can be used to signal
>>> the
>>> > Operator to use the old mode.
>>> >
>>> > Also described in the configuration description, the Operator will
>>> fallback
>>> > to the old mode if the FlinkStateSnapshot CRD cannot be found on the
>>> > Kubernetes cluster.
>>> >
>>> > Regards,
>>> > Mate
>>> >
>>> > Gyula Fóra  ezt írta (időpont: 2024. ápr. 16.,
>>> K,
>>> > 17:01):
>>> >
>>> > > Thanks Mate, this is great stuff.
>>> > >
>>> > > Mate, I think the new configs should probably default to the new
>>> mode and
>>> > > they should only be useful for users to fall back to the old
>>> behaviour.
>>> > > We could by default use the new Snapshot CRD if the CRD is installed,
>>> > > otherwise use the old mode by default and log a warning on startup.
>>> > >
>>> > > So I am suggesting a "dynamic" default behaviour based on whether
>>> the new
>>> > > CRD was installed or not because we don't want to break operator
>>> startup.
>>> > >
>>> > > Gyula
>>> > >
>>> > > On Tue, Apr 16, 2024 at 4:48 PM Mate Czagany 
>>> wrote:
>>> > >
>>> > > > Hi Ferenc,
>>> > > >
>>> > > > Thank you for your comments, I have updated the Google docs with a
>>> new
>>> > > > section for the new 

[jira] [Created] (FLINK-35172) DDL statement is added to the Schema Change Event

2024-04-19 Thread melin (Jira)
melin created FLINK-35172:
-

 Summary: DDL statement is added to the Schema Change Event
 Key: FLINK-35172
 URL: https://issues.apache.org/jira/browse/FLINK-35172
 Project: Flink
  Issue Type: New Feature
  Components: Flink CDC
Reporter: melin


The current implementation of the kafka pipeline data sink connector does not 
write ddl statements to the topic because the original dddl statements are 
missing. ddl cannot be generated backwards using a Schema Change Event.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-446: Kubernetes Operator State Snapshot CRD

2024-04-19 Thread Robert Metzger
Hi Mate,
thanks for proposing this, I'm really excited about your FLIP. I hope my
questions make sense to you:

1. I would like to discuss the "FlinkStateSnapshot" name and the fact that
users have to use either the savepoint or checkpoint spec inside the
FlinkStateSnapshot.
Wouldn't it be more intuitive to introduce two CRs:
FlinkSavepoint and FlinkCheckpoint
Ideally they can internally share a lot of code paths, but from a users
perspective, the abstraction is much clearer.

2. I also would like to discuss SavepointSpec.completed, as this name is
not intuitive to me. How about "ignoreExisting"?

3. The FLIP proposal seems to leave error handling to the user, e.g. when
you create a FlinkStateSnapshot, it will just move to status FAILED.
Typically in K8s with the control loop stuff, resources are tried to get
created until success. I think it would be really nice if the
FlinkStateSnapshot or FlinkSavepoint resource would retry based on a
property in the resource. A "FlinkStateSnapshot.retries" number would
indicate how often the user wants the operator to retry creating a
savepoint, "retries = -1" means retry forever. In addition, we could
consider a timeout as well, however, I haven't seen such a concept in K8s
CRs yet.
The benefit of this is that other tools relying on the K8s operator
wouldn't have to implement this retry loop (which is quite natural for
K8s), they would just have to wait for the CR they've created to transition
into COMPLETED:

3. FlinkStateSnapshotStatus.error will only show the last error. What about
using Events, so that we can show multiple errors and use the
FlinkStateSnapshotState to report errors?

4. I wonder if it makes sense to use something like Pod Conditions (e.g.
Savepoint Conditions):
https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-conditions
to track the completion status. We could have the following conditions:
- Triggered
- Completed
- Failed
The only benefit of this proposal that I see is that it would tell the user
how long it took to create the savepoint.

5. You mention that "JobSpec.initialSavepointPath" will be deprecated. I
assume we will introduce a new field for referencing a FlinkStateSnapshot
CR? I think it would be good to cover this in the FLIP.


One minor comment:

"/** Dispose the savepoints upon CRD deletion. */"

I think this should be "upon CR deletion", not "CRD deletion".

Thanks again for this great FLIP!

Best,
Robert


On Fri, Apr 19, 2024 at 9:01 AM Gyula Fóra  wrote:

> Cc'ing some folks who gave positive feedback on this idea in the past.
>
> I would love to hear your thoughts on the proposed design
>
> Gyula
>
> On Tue, Apr 16, 2024 at 6:31 PM Őrhidi Mátyás 
> wrote:
>
>> +1 Looking forward to it
>>
>> On Tue, Apr 16, 2024 at 8:56 AM Mate Czagany  wrote:
>>
>> > Thank you Gyula!
>> >
>> > I think that is a great idea. I have updated the Google doc to only
>> have 1
>> > new configuration option of boolean type, which can be used to signal
>> the
>> > Operator to use the old mode.
>> >
>> > Also described in the configuration description, the Operator will
>> fallback
>> > to the old mode if the FlinkStateSnapshot CRD cannot be found on the
>> > Kubernetes cluster.
>> >
>> > Regards,
>> > Mate
>> >
>> > Gyula Fóra  ezt írta (időpont: 2024. ápr. 16., K,
>> > 17:01):
>> >
>> > > Thanks Mate, this is great stuff.
>> > >
>> > > Mate, I think the new configs should probably default to the new mode
>> and
>> > > they should only be useful for users to fall back to the old
>> behaviour.
>> > > We could by default use the new Snapshot CRD if the CRD is installed,
>> > > otherwise use the old mode by default and log a warning on startup.
>> > >
>> > > So I am suggesting a "dynamic" default behaviour based on whether the
>> new
>> > > CRD was installed or not because we don't want to break operator
>> startup.
>> > >
>> > > Gyula
>> > >
>> > > On Tue, Apr 16, 2024 at 4:48 PM Mate Czagany 
>> wrote:
>> > >
>> > > > Hi Ferenc,
>> > > >
>> > > > Thank you for your comments, I have updated the Google docs with a
>> new
>> > > > section for the new configs.
>> > > > All of the newly added config keys will have defaults set, and by
>> > default
>> > > > all the savepoint/checkpoint operations will use the old system:
>> write
>> > > > their results to the FlinkDeployment/FlinkSessionJob status field.
>> > > >
>> > > > I have also added a default for the checkpoint type to be FULL
>> (which
>> > is
>> > > > also the default currently). That was an oversight on my part to
>> miss
>> > > that.
>> > > >
>> > > > Regards,
>> > > > Mate
>> > > >
>> > > > Ferenc Csaky  ezt írta (időpont: 2024.
>> > ápr.
>> > > > 16., K, 16:10):
>> > > >
>> > > > > Thank you Mate for initiating this discussion. +1 for this idea.
>> > > > > Some Qs:
>> > > > >
>> > > > > Can you specify the newly introduced configurations in more
>> > > > > details? Currently, it is not fully clear to me what are the
>> > > > > possible values of 

Re: [VOTE] FLIP-435: Introduce a New Materialized Table for Simplifying Data Pipelines

2024-04-19 Thread Yuepeng Pan
+1(non-binding)

Best,
Yuepeng Pan

At 2024-04-19 15:22:04, "gongzhongqiang"  wrote:
>+1(non-binding)
>
>
>Best,
>
>Zhongqiang Gong
>
>Ron liu  于2024年4月17日周三 14:28写道:
>
>> Hi Dev,
>>
>> Thank you to everyone for the feedback on FLIP-435: Introduce a New
>> Materialized Table for Simplifying Data Pipelines[1][2].
>>
>> I'd like to start a vote for it. The vote will be open for at least 72
>> hours unless there is an objection or not enough votes.
>>
>> [1]
>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-435%3A+Introduce+a+New+Materialized+Table+for+Simplifying+Data+Pipelines
>> [2] https://lists.apache.org/thread/c1gnn3bvbfs8v1trlf975t327s4rsffs
>>
>> Best,
>> Ron
>>


[jira] [Created] (FLINK-35171) Class 'FlinkSqlParserImpl' is missed

2024-04-19 Thread chenyu (Jira)
chenyu created FLINK-35171:
--

 Summary: Class 'FlinkSqlParserImpl' is missed
 Key: FLINK-35171
 URL: https://issues.apache.org/jira/browse/FLINK-35171
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.19.0
Reporter: chenyu
 Attachments: image-2024-04-19-15-55-54-035.png

!image-2024-04-19-15-55-54-035.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-435: Introduce a New Materialized Table for Simplifying Data Pipelines

2024-04-19 Thread gongzhongqiang
+1(non-binding)


Best,

Zhongqiang Gong

Ron liu  于2024年4月17日周三 14:28写道:

> Hi Dev,
>
> Thank you to everyone for the feedback on FLIP-435: Introduce a New
> Materialized Table for Simplifying Data Pipelines[1][2].
>
> I'd like to start a vote for it. The vote will be open for at least 72
> hours unless there is an objection or not enough votes.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-435%3A+Introduce+a+New+Materialized+Table+for+Simplifying+Data+Pipelines
> [2] https://lists.apache.org/thread/c1gnn3bvbfs8v1trlf975t327s4rsffs
>
> Best,
> Ron
>


[jira] [Created] (FLINK-35170) SqlServer connector support scanNewlyAddedTableEnabled param

2024-04-19 Thread fengfengzai (Jira)
fengfengzai created FLINK-35170:
---

 Summary: SqlServer connector support scanNewlyAddedTableEnabled 
param
 Key: FLINK-35170
 URL: https://issues.apache.org/jira/browse/FLINK-35170
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: 2.0.0
Reporter: fengfengzai


SqlServer connector support scanNewlyAddedTableEnabled param



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-446: Kubernetes Operator State Snapshot CRD

2024-04-19 Thread Gyula Fóra
Cc'ing some folks who gave positive feedback on this idea in the past.

I would love to hear your thoughts on the proposed design

Gyula

On Tue, Apr 16, 2024 at 6:31 PM Őrhidi Mátyás 
wrote:

> +1 Looking forward to it
>
> On Tue, Apr 16, 2024 at 8:56 AM Mate Czagany  wrote:
>
> > Thank you Gyula!
> >
> > I think that is a great idea. I have updated the Google doc to only have
> 1
> > new configuration option of boolean type, which can be used to signal the
> > Operator to use the old mode.
> >
> > Also described in the configuration description, the Operator will
> fallback
> > to the old mode if the FlinkStateSnapshot CRD cannot be found on the
> > Kubernetes cluster.
> >
> > Regards,
> > Mate
> >
> > Gyula Fóra  ezt írta (időpont: 2024. ápr. 16., K,
> > 17:01):
> >
> > > Thanks Mate, this is great stuff.
> > >
> > > Mate, I think the new configs should probably default to the new mode
> and
> > > they should only be useful for users to fall back to the old behaviour.
> > > We could by default use the new Snapshot CRD if the CRD is installed,
> > > otherwise use the old mode by default and log a warning on startup.
> > >
> > > So I am suggesting a "dynamic" default behaviour based on whether the
> new
> > > CRD was installed or not because we don't want to break operator
> startup.
> > >
> > > Gyula
> > >
> > > On Tue, Apr 16, 2024 at 4:48 PM Mate Czagany 
> wrote:
> > >
> > > > Hi Ferenc,
> > > >
> > > > Thank you for your comments, I have updated the Google docs with a
> new
> > > > section for the new configs.
> > > > All of the newly added config keys will have defaults set, and by
> > default
> > > > all the savepoint/checkpoint operations will use the old system:
> write
> > > > their results to the FlinkDeployment/FlinkSessionJob status field.
> > > >
> > > > I have also added a default for the checkpoint type to be FULL (which
> > is
> > > > also the default currently). That was an oversight on my part to miss
> > > that.
> > > >
> > > > Regards,
> > > > Mate
> > > >
> > > > Ferenc Csaky  ezt írta (időpont: 2024.
> > ápr.
> > > > 16., K, 16:10):
> > > >
> > > > > Thank you Mate for initiating this discussion. +1 for this idea.
> > > > > Some Qs:
> > > > >
> > > > > Can you specify the newly introduced configurations in more
> > > > > details? Currently, it is not fully clear to me what are the
> > > > > possible values of `kubernetes.operator.periodic.savepoint.mode`,
> > > > > is it optional, has a default value?
> > > > >
> > > > > I see that in `SavepointSpec.formatType` has a default, although
> > > > > `CheckppointSpec.checkpointType` not. Are we inferring that from
> > > > > the config? My point is, in general I think it would be good to
> > > > > handle the two snapshot types in a similar way when it makes sense
> > > > > to minimize any kind of confusion.
> > > > >
> > > > > Best,
> > > > > Ferenc
> > > > >
> > > > >
> > > > >
> > > > > On Tuesday, April 16th, 2024 at 11:34, Mate Czagany <
> > > czmat...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > >
> > > > > >
> > > > > > Hi Everyone,
> > > > > >
> > > > > > I would like to start a discussion on FLIP-446: Kubernetes
> Operator
> > > > State
> > > > > > Snapshot CRD.
> > > > > >
> > > > > > This FLIP adds a new custom resource for Operator users to create
> > and
> > > > > > manage their savepoints and checkpoints. I have also developed an
> > > > initial
> > > > > > POC to prove that this approach is feasible, you can find the
> link
> > > for
> > > > > that
> > > > > > in the FLIP.
> > > > > >
> > > > > > There is a Confluence page [1] and a Google Docs page [2] as I do
> > not
> > > > > have
> > > > > > a Confluence account yet.
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-446%3A+Kubernetes+Operator+State+Snapshot+CRD
> > > > > > [2]
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1VdfLFaE4i6ESbCQ38CH7TKOiPQVvXeOxNV2FeSMnOTg
> > > > > >
> > > > > >
> > > > > > Regards,
> > > > > > Mate
> > > > >
> > > >
> > >
> >
>