[jira] [Created] (FLINK-32364) Add Rescaling benchmark for ChangelogStateBackend

2023-06-15 Thread Hangxiang Yu (Jira)
Hangxiang Yu created FLINK-32364:


 Summary: Add Rescaling benchmark for ChangelogStateBackend
 Key: FLINK-32364
 URL: https://issues.apache.org/jira/browse/FLINK-32364
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Hangxiang Yu
Assignee: Hangxiang Yu


After FLINK-23484, we could Supports rescaling benchmark just like HEAP and 
ROCKSDB.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32363) calcite 1.21 supports type coercion but flink don't enable it in validate

2023-06-15 Thread jackylau (Jira)
jackylau created FLINK-32363:


 Summary: calcite 1.21 supports type coercion but flink don't 
enable it in validate
 Key: FLINK-32363
 URL: https://issues.apache.org/jira/browse/FLINK-32363
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.18.0
Reporter: jackylau
 Fix For: 1.18.0


1) calcite 1.21 supports type coercion and enabled default while flink disabled

2) spark /mysql can run it 

3) although, we can make it run by select count(distinct `if`(1>5, 'x', 
cast(null as varchar)));

i think we should enable it or offers a config to enable it

 
{code:java}
Flink SQL> select count(distinct `if`(1>5, 'x', null));
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Illegal use of 
'NULL'{code}
{code:java}
// it can run in spark
spark-sql (default)> select count(distinct `if`(1>5, 'x', null)); 
0
{code}
 
{code:java}
private def createSqlValidator(catalogReader: CalciteCatalogReader) = {
  val validator = new FlinkCalciteSqlValidator(
operatorTable,
catalogReader,
typeFactory,
SqlValidator.Config.DEFAULT
  .withIdentifierExpansion(true)
  .withDefaultNullCollation(FlinkPlannerImpl.defaultNullCollation)
  .withTypeCoercionEnabled(false)
  ) // Disable implicit type coercion for now.
  validator
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-15 Thread Dong Lin
Hi again Piotr,

Thank you for the reply. Please see my reply inline.

On Fri, Jun 16, 2023 at 12:11 AM Piotr Nowojski 
wrote:

> Hi again Dong,
>
> > I understand that JM will get the backpressure-related metrics every time
> > the RestServerEndpoint receives the REST request to get these metrics.
> But
> > I am not sure if RestServerEndpoint is already always receiving the REST
> > metrics at regular interval (suppose there is no human manually
> > opening/clicking the Flink Web UI). And if it does, what is the interval?
>
> Good catch, I've thought that metrics are pre-emptively sent to JM every 10
> seconds.
> Indeed that's not the case at the moment, and that would have to be
> improved.
>
> > I would be surprised if Flink is already paying this much overhead just
> for
> > metrics monitoring. That is the main reason I still doubt it is true. Can
> > you show where this 100 ms is currently configured?
> >
> > Alternatively, maybe you mean that we should add extra code to invoke the
> > REST API at 100 ms interval. Then that means we need to considerably
> > increase the network/cpu overhead at JM, where the overhead will increase
> > as the number of TM/slots increase, which may pose risk to the
> scalability
> > of the proposed design. I am not sure we should do this. What do you
> think?
>
> Sorry. I didn't mean metric should be reported every 100ms. I meant that
> "backPressuredTimeMsPerSecond (metric) would report (a value of) 100ms/s."
> once per metric interval (10s?).
>

Suppose there are 1000 subtask and each subtask has 1% chance of being
"backpressured" at a given time (due to random traffic spikes). Then at any
given time, the chance of the job
being considered not-backpressured = (1-0.01)^1000. Since we evaluate the
backpressure metric once a second, the estimated time for the job
to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) = 23163
sec = 6.4 hours.

This means that the job will effectively always use the longer
checkpointing interval. It looks like a real concern, right?


> > - What is the interface of this CheckpointTrigger? For example, are we
> > going to give CheckpointTrigger a context that it can use to fetch
> > arbitrary metric values? This can help us understand what information
> this
> > user-defined CheckpointTrigger can use to make the checkpoint decision.
>
> I honestly don't think this is important at this stage of the discussion.
> It could have
> whatever interface we would deem to be best. Required things:
>
> - access to at least a subset of metrics that the given `CheckpointTrigger`
> requests,
>   for example via some registration mechanism, so we don't have to fetch
> all of the
>   metrics all the time from TMs.
> - some way to influence `CheckpointCoordinator`. Either via manually
> triggering
>   checkpoints, and/or ability to change the checkpointing interval.
>

Hmm... I honestly think it will be useful to know the APIs due to the
following reasons.

We would need to know the concrete APIs to gauge the following:
- For the use-case mentioned in FLIP-309 motivation section, would the APIs
of this alternative approach be more or less usable?
- Can these APIs reliably address the extra use-case (e.g. allow
checkpointing interval to change dynamically even during the unbounded
phase) as it claims?
- Can these APIs be decoupled from the APIs currently proposed in FLIP-309?

For example, if the APIs of this alternative approach can be decoupled from
the APIs currently proposed in FLIP-309, then it might be reasonable to
work on this extra use-case with a more advanced/complicated design
separately in a followup work.


> > - Where is this CheckpointTrigger running? For example, is it going to
> run
> > on the subtask of every source operator? Or is it going to run on the JM?
>
> IMO on the JM.
>
> > - Are we going to provide a default implementation of this
> > CheckpointTrigger in Flink that implements the algorithm described below,
> > or do we expect each source operator developer to implement their own
> > CheckpointTrigger?
>
> As I mentioned before, I think we should provide at the very least the
> implementation
> that replaces the current triggering mechanism (statically configured
> checkpointing interval)
> and it would be great to provide the backpressure monitoring trigger as
> well.
>

I agree that if there is a good use-case that can be addressed by the
proposed CheckpointTrigger, then it is reasonable
to add CheckpointTrigger and replace the current triggering mechanism with
it.

I also agree that we will likely find such a use-case. For example, suppose
the source records have event timestamps, then it is likely
that we can use the trigger to dynamically control the checkpointing
interval based on the difference between the watermark and current system
time.

But I am not sure the addition of this CheckpointTrigger should be coupled
with FLIP-309. Whether or not it is coupled probably depends on the
concrete API design around 

Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode

2023-06-15 Thread Feng Jin
Hi everyone,

Thank you all for your valuable input. If there are no further questions or
concerns regarding FLIP-308[1], I would like to start voting on Monday,
June 19th.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-308%3A+Support+Time+Travel


Best,

Feng

On Mon, Jun 12, 2023 at 10:57 AM Feng Jin  wrote:

> Thanks Benchao and Leonard.
>
> 'Implicitly type conversion' makes sense to me.   I will emphasize the
> 'Implicitly type conversion' in the document.
>
>
> Best,
> Feng
>
> On Sat, Jun 10, 2023 at 10:11 AM Benchao Li  wrote:
>
>> Thanks Leonard for the input, "Implicitly type conversion" way sounds good
>> to me.
>> I also agree that this should be done in planner instead of connector,
>> it'll be a lot easier for connector development.
>>
>> Leonard Xu  于2023年6月9日周五 20:11写道:
>>
>> > About the semantics consideration, I have some new input after rethink.
>> >
>> > 1. We can support both TIMESTAMP and TIMESTAMP_LTZ expression following
>> > the syntax  `SELECT [column_name(s)] FROM [table_name] FOR SYSTEM_TIME
>> AS
>> > OF `
>> >
>> > 2. For TIMESTAMP_LTZ type, give a long instant value to CatalogTable is
>> > pretty intuitive, for TIMESTAMP_type, it will be implied cast to
>> > TIMESTAMP_LTZ type by planner using session timezone and then pass to
>> > CatalogTable. This case can be considered as a Function
>> AsOfSnapshot(Table
>> > t, TIMESTAMP_LTZ arg), which can pass arg with TIMESTAMP_LTZ type, but
>> our
>> > framework supports implicit type conversion thus users can also pass arg
>> > with TIMESTAMP type. Hint, Spark[1] did the  implicit type conversion
>> too.
>> >
>> > 3.I also considered handing over the implicit type conversion to the
>> > connector instead of planner, such as passing a TIMESTAMP literal, and
>> the
>> > connector using the session timezone to perform type conversion, but
>> this
>> > is more complicated than previous planner handling, and it’s not
>> friendly
>> > to the connector developers.
>> >
>> > 4. The last point,  TIMESTAMP_LTZ  '1970-01-01 00:00:04.001’ should be
>> an
>> > invalid expression as if you can not define a instant point (i.e
>> > TIMSTAMP_LTZ semantics in SQL) from a timestamp literal without
>> timezone.
>> > You can use explicit type conversion like `cast(ts_ntz as
>> TIMESTAMP_LTZ)`
>> > after `FOR SYSTEM_TIME AS OF ` if you want to use
>> > Timestamp type/expression/literal without timezone.
>> >
>> > 5. The last last point, the TIMESTAMP_LTZ type of Flink SQL supports DST
>> > time[2] well that will help user avoid many corner case.
>> >
>> >
>> > Best,
>> > Leonard
>> >
>> > [1]
>> >
>> https://github.com/apache/spark/blob/0ed48feab65f2d86f5dda3e16bd53f2f795f5bc5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala#L56
>> > [2]
>> >
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/timezone/#daylight-saving-time-support
>> >
>> >
>> >
>> >
>> > > On Jun 9, 2023, at 1:13 PM, Benchao Li  wrote:
>> > >
>> > > As you can see that you must use `UNIX_TIMESTAMP` to do this work,
>> that's
>> > > where the time zone happens.
>> > >
>> > > What I'm talking about is casting timestamp/timestamp_ltz to long
>> > directly,
>> > > that's why the semantic is tricky when you are casting timestamp to
>> long
>> > > using time zone.
>> > >
>> > > For other systems, such as SQL server[1], they actually uses a string
>> > > instead of timestamp literal `FOR SYSTEM_TIME AS OF '2021-01-01
>> > > 00:00:00.000'`, I'm not sure whether they convert the string
>> > implicitly
>> > > to TIMESTAMP_LTZ, or they just have a different definition of the
>> syntax.
>> > >
>> > > But for us, we are definitely using timestamp/timestmap_ltz literal
>> here,
>> > > that's why it is special, and we must highlight this behavior that we
>> are
>> > > converting a timestamp without time zone literal to long using the
>> > session
>> > > time zone.
>> > >
>> > > [1]
>> > >
>> >
>> https://learn.microsoft.com/en-us/sql/relational-databases/tables/temporal-table-usage-scenarios?view=sql-server-ver16
>> > >
>> > > Feng Jin  于2023年6月8日周四 11:35写道:
>> > >
>> > >> Hi all,
>> > >>
>> > >> thanks for your input
>> > >>
>> > >>
>> > >> @Benchao
>> > >>
>> > >>> The type for "TIMESTAMP '2023-04-27 00:00:00'" should be "TIMESTAMP
>> > >> WITHOUT TIME ZONE", converting it to unix timestamp would use UTC
>> > timezone,
>> > >> which is not usually expected by users.
>> > >>
>> > >> It was indeed the case before Flink 1.13, but now my understanding is
>> > that
>> > >> there have been some slight changes in the definition of TIMESTAMP.
>> > >>
>> > >> TIMESTAMP is currently used to specify the year, month, day, hour,
>> > minute
>> > >> and second. We recommend that users use
>> > *UNIX_TIMESTAMP(CAST(timestamp_col
>> > >> AS STRING))* to convert *TIMESTAMP values* and *long values*. The
>> > >> *UNIX_TIMESTAMP* function will use the *LOCAL TIME ZONE*. Therefore,
>> > >> whether converting TIMESTAMP or TIMESTAMP_LTZ to Long 

[jira] [Created] (FLINK-32362) SourceAlignment announceCombinedWatermark period task maybe lost

2023-06-15 Thread Cai Liuyang (Jira)
Cai Liuyang created FLINK-32362:
---

 Summary: SourceAlignment announceCombinedWatermark period task 
maybe lost
 Key: FLINK-32362
 URL: https://issues.apache.org/jira/browse/FLINK-32362
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.16.0
Reporter: Cai Liuyang


When we use sourcealignment,we also found there is another problem that 
announceCombinedWatermark may throw a exception (like  "subtask 25 is not ready 
yet to receive events" , this subtask maybe under failover), which will lead 
the period task not running any more (ThreadPoolExecutor will not schedule the 
period task if it throw a exception)

I think we should increase the robustness of announceCombinedWatermark function 
to avoid it throw any exception (if send fail, just wait next send) (code see 
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199]
 )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-321: Introduce an API deprecation process

2023-06-15 Thread Xintong Song
>
> Public API is a well defined common concept, and one of its
> convention is that it only changes with a major version change.
>

I agree. And from my understanding, demoting a Public API is also a kind of
such change, just like removing one, which can only happen with major
version bumps. I'm not proposing to allow demoting Public APIs anytime, but
only in the case major version bumps happen before reaching the
2-minor-release migration period. Actually, demoting would be a weaker
change compared to removing the API immediately upon major version bumps,
in order to keep the commitment about the 2-minor-release migration period.
If the concern is that `@Public` -> `@PublicEvolving` sounds against
conventions, we may introduce a new annotation if necessary, e.g.,
`@PublicRetiring`, to avoid confusions.

But it should be
> completely OK to bump up the major version if we really want to get rid of
> a public API, right?
>

I'm not sure about this. Yes, it's completely "legal" that we bump up the
major version whenever a breaking change is needed. However, this also
weakens the value of the commitment that public APIs will stay stable
within the major release series, as the series can end anytime. IMHO, short
major release series are not something "make the end users happy", but
backdoors that allow us as the developers to make frequent breaking
changes. On the contrary, with the demoting approach, we can still have
longer major release series, while only allowing Public APIs deprecated at
the end of the previous major version to be removed in the next major
version.

Given our track record I would prefer a regular cycle (1-2 years) to
> force us to think about this whole topic, and not put it again to the
> wayside and giving us (and users) a clear expectation on when breaking
> changes can be made.
>

+1. I personally think 2-3 years would be a good time for new major
versions, or longer if there's no breaking changes needed. That makes 1-2
year a perfect time to revisit the topic, while leaving us more time to
prepare the major release if needed.

Best,

Xintong



On Thu, Jun 15, 2023 at 10:09 PM Chesnay Schepler 
wrote:

> On 13/06/2023 17:26, Becket Qin wrote:
> > It would be valuable if we can avoid releasing minor versions for
> previous
> > major versions.
>
> On paper, /absolutely /agree, but I'm not sure how viable that is in
> practice.
>
> On the current 2.0 agenda is potentially dropping support for Java 8/11,
> which may very well be a problem for our current users.
>
>
> On 13/06/2023 17:26, Becket Qin wrote:
> > Thanks for the feedback and sorry for the confusion about Public API
> > deprecation. I just noticed that there was a mistake in the NOTES part
> for
> > Public API due to a copy-paste error... I just fixed it.
> I'm very relieved to hear that. Glad to hear that we are on the same
> page on that note.
>
>
> On 15/06/2023 15:20, Becket Qin wrote:
> > But it should be
> > completely OK to bump up the major version if we really want to get rid
> of
> > a public API, right?
>
> Technically yes, but look at how long it took to get us to 2.0. ;)
>
> There's a separate discussion to be had on the cadence of major releases
> going forward, and there seem to be different opinions on that.
>
> If we take the Kafka example of 2 minor releases between major ones,
> that for us means that users have to potentially deal with breaking
> changes every 6 months, which seems like a lot.
>
> Given our track record I would prefer a regular cycle (1-2 years) to
> force us to think about this whole topic, and not put it again to the
> wayside and giving us (and users) a clear expectation on when breaking
> changes can be made.
>
> But again, maybe this should be in a separate thread.
>
> On 14/06/2023 11:37, Becket Qin wrote:
> > Do you have an example of behavioral change in mind? Not sure I fully
> > understand the concern for behavioral change here.
>
> This could be a lot of things. It can be performance in certain
> edge-cases, a bug fix that users (maybe unknowingly) relied upon
> (https://xkcd.com/1172/), a semantic change to some API.
>
> For a concrete example, consider the job submission. A few releases back
> we made changes such that the initialization of the job master happens
> asynchronously.
> This meant the job submission call returns sooner, and the job state
> enum was extended to cover this state.
> API-wise we consider this a compatible change, but the observed behavior
> may be different.
>
> Metrics are another example; I believe over time we changed what some
> metrics returned a few times.
>


[jira] [Created] (FLINK-32361) error after replace dependent jar file

2023-06-15 Thread Spongebob (Jira)
Spongebob created FLINK-32361:
-

 Summary: error after replace dependent jar file
 Key: FLINK-32361
 URL: https://issues.apache.org/jira/browse/FLINK-32361
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.14.5
Reporter: Spongebob


in the standalone session mode. I have one dependent jar file named 'A.jar' in 
the folder `lib1`, so I submit my app via command `flink run -C 
file:///lib1/A.jar -c Application ./myApp.jar`.  well it runs normally. 

And, I have the same jar file named 'A.jar' in the folder `lib2` also which was 
copied from `lib1`. then I delete A.jar in `lib1`, copy the same jar from 
`lib2` to `lib1`, re-submit the application. Finally I would get an 
ClassNotFoundException which class refer to A.jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] Updates to Flink's external connector CI workflows

2023-06-15 Thread Mason Chen
Can't wait to use this in the Kafka repo, especially the thread dump.
Thanks Martjin and Sergey!

Best,
Mason

On Thu, Jun 15, 2023 at 4:05 PM Sergey Nuyanzin  wrote:

> Thanks a lot Martijn for making it
>
> On Thu, Jun 15, 2023 at 2:07 PM Leonard Xu  wrote:
>
> > Thanks Martijn for the great work !
> >
> >
> > Best,
> > Leonard
> >
> >
>
> --
> Best regards,
> Sergey
>


Re: [ANNOUNCE] Updates to Flink's external connector CI workflows

2023-06-15 Thread Sergey Nuyanzin
Thanks a lot Martijn for making it

On Thu, Jun 15, 2023 at 2:07 PM Leonard Xu  wrote:

> Thanks Martijn for the great work !
>
>
> Best,
> Leonard
>
>

-- 
Best regards,
Sergey


Flink-Metrics Prometheus - Native Histograms / Native Counters

2023-06-15 Thread Ryan van Huuksloot
Hello,

Internally we use the flink-metrics-prometheus jar and we noticed that the
code is a little out of date. Primarily, there are new metric types in
Prometheus that would allow for the exporter to write Counters and
Histograms as Native metrics in prometheus (vs writing as Gauges).

I noticed that there was a closed PR for the simpleclient:
https://github.com/apache/flink/pull/21047 - which has what is required for
the native metrics but may cause other maintenance tickets.

Is there any appetite from the community to update this exporter?

Thanks,

Ryan van Huuksloot
Sr. Production Engineer | Streaming Platform
[image: Shopify]



[VOTE] FLIP-246: Dynamic Kafka Source (originally Multi Cluster Kafka Source)

2023-06-15 Thread Mason Chen
Hi all,

Thank you to everyone for the feedback on FLIP-246 [1]. Based on the
discussion thread [2], we have come to a consensus on the design and are
ready to take a vote to contribute this to Flink.

This voting thread will be open for at least 72 hours (excluding weekends,
until June 20th 10:00AM PST) unless there is an objection or an
insufficient number of votes.

(Optional) If you have an opinion on the naming of the connector, please
include it in your vote:
1. DynamicKafkaSource
2. MultiClusterKafkaSource
3. DiscoveringKafkaSource

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320
[2] https://lists.apache.org/thread/vz7nw5qzvmxwnpktnofc9p13s1dzqm6z

Best,
Mason


Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-15 Thread Piotr Nowojski
Hi again Dong,

> I understand that JM will get the backpressure-related metrics every time
> the RestServerEndpoint receives the REST request to get these metrics. But
> I am not sure if RestServerEndpoint is already always receiving the REST
> metrics at regular interval (suppose there is no human manually
> opening/clicking the Flink Web UI). And if it does, what is the interval?

Good catch, I've thought that metrics are pre-emptively sent to JM every 10
seconds.
Indeed that's not the case at the moment, and that would have to be
improved.

> I would be surprised if Flink is already paying this much overhead just
for
> metrics monitoring. That is the main reason I still doubt it is true. Can
> you show where this 100 ms is currently configured?
>
> Alternatively, maybe you mean that we should add extra code to invoke the
> REST API at 100 ms interval. Then that means we need to considerably
> increase the network/cpu overhead at JM, where the overhead will increase
> as the number of TM/slots increase, which may pose risk to the scalability
> of the proposed design. I am not sure we should do this. What do you
think?

Sorry. I didn't mean metric should be reported every 100ms. I meant that
"backPressuredTimeMsPerSecond (metric) would report (a value of) 100ms/s."
once per metric interval (10s?).

> - What is the interface of this CheckpointTrigger? For example, are we
> going to give CheckpointTrigger a context that it can use to fetch
> arbitrary metric values? This can help us understand what information this
> user-defined CheckpointTrigger can use to make the checkpoint decision.

I honestly don't think this is important at this stage of the discussion.
It could have
whatever interface we would deem to be best. Required things:

- access to at least a subset of metrics that the given `CheckpointTrigger`
requests,
  for example via some registration mechanism, so we don't have to fetch
all of the
  metrics all the time from TMs.
- some way to influence `CheckpointCoordinator`. Either via manually
triggering
  checkpoints, and/or ability to change the checkpointing interval.

> - Where is this CheckpointTrigger running? For example, is it going to run
> on the subtask of every source operator? Or is it going to run on the JM?

IMO on the JM.

> - Are we going to provide a default implementation of this
> CheckpointTrigger in Flink that implements the algorithm described below,
> or do we expect each source operator developer to implement their own
> CheckpointTrigger?

As I mentioned before, I think we should provide at the very least the
implementation
that replaces the current triggering mechanism (statically configured
checkpointing interval)
and it would be great to provide the backpressure monitoring trigger as
well.
If you would be adamant that the backpressure monitoring doesn't cover well
enough your use case, I would be ok to provide the hacky version that I
also mentioned
before:

"""
Especially that if my proposed algorithm wouldn't work good enough, there is
an obvious solution, that any source could add a metric, like let say
"processingBacklog: true/false", and the `CheckpointTrigger`
could use this as an override to always switch to the
"slowCheckpointInterval". I don't think we need it, but that's always an
option
that would be basically equivalent to your original proposal.
"""

> - How can users specify the fastCheckpointInterval/slowCheckpointInterval?
> For example, will we provide APIs on the CheckpointTrigger that end-users
> can use to specify the checkpointing interval? What would that look like?

Also as I mentioned before, just like metric reporters are configured:
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/metric_reporters/
Every CheckpointTrigger could have its own custom configuration.

> Overall, my gut feel is that the alternative approach based on
> CheckpointTrigger is more complicated

Yes, as usual, more generic things are more complicated, but often more
useful in the long run.

> and harder to use.

I don't agree. Why setting in config

execution.checkpointing.trigger: BackPressureMonitoringCheckpointTrigger
execution.checkpointing.BackPressureMonitoringCheckpointTrigger.fast-interval:
1s
execution.checkpointing.BackPressureMonitoringCheckpointTrigger.slow-interval:
30s

that we could even provide a shortcut to the above construct via:

execution.checkpointing.fast-interval: 1s
execution.checkpointing.slow-interval: 30s

is harder compared to setting two/three checkpoint intervals, one in the
config/or via `env.enableCheckpointing(x)`,
secondly passing one/two (fast/slow) values on the source itself?

> And it probably also has the issues of "having two places to configure
checkpointing
> interval" and "giving flexibility for every source to implement a
different
> API" (as mentioned below).

No, it doesn't.

> IMO, it is a hard-requirement for the user-facing API to be
> clearly defined and users should be able to use the API 

[jira] [Created] (FLINK-32360) Optimize DataStream#coGroup in stream mode when results are emitted at end of input

2023-06-15 Thread Dong Lin (Jira)
Dong Lin created FLINK-32360:


 Summary: Optimize DataStream#coGroup in stream mode when results 
are emitted at end of input
 Key: FLINK-32360
 URL: https://issues.apache.org/jira/browse/FLINK-32360
 Project: Flink
  Issue Type: Improvement
Reporter: Dong Lin
Assignee: Dong Lin






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-321: Introduce an API deprecation process

2023-06-15 Thread Chesnay Schepler

On 13/06/2023 17:26, Becket Qin wrote:

It would be valuable if we can avoid releasing minor versions for previous
major versions.


On paper, /absolutely /agree, but I'm not sure how viable that is in 
practice.


On the current 2.0 agenda is potentially dropping support for Java 8/11, 
which may very well be a problem for our current users.



On 13/06/2023 17:26, Becket Qin wrote:

Thanks for the feedback and sorry for the confusion about Public API
deprecation. I just noticed that there was a mistake in the NOTES part for
Public API due to a copy-paste error... I just fixed it.
I'm very relieved to hear that. Glad to hear that we are on the same 
page on that note.



On 15/06/2023 15:20, Becket Qin wrote:

But it should be
completely OK to bump up the major version if we really want to get rid of
a public API, right?


Technically yes, but look at how long it took to get us to 2.0. ;)

There's a separate discussion to be had on the cadence of major releases 
going forward, and there seem to be different opinions on that.


If we take the Kafka example of 2 minor releases between major ones, 
that for us means that users have to potentially deal with breaking 
changes every 6 months, which seems like a lot.


Given our track record I would prefer a regular cycle (1-2 years) to 
force us to think about this whole topic, and not put it again to the 
wayside and giving us (and users) a clear expectation on when breaking 
changes can be made.


But again, maybe this should be in a separate thread.

On 14/06/2023 11:37, Becket Qin wrote:

Do you have an example of behavioral change in mind? Not sure I fully
understand the concern for behavioral change here.


This could be a lot of things. It can be performance in certain 
edge-cases, a bug fix that users (maybe unknowingly) relied upon 
(https://xkcd.com/1172/), a semantic change to some API.


For a concrete example, consider the job submission. A few releases back 
we made changes such that the initialization of the job master happens 
asynchronously.
This meant the job submission call returns sooner, and the job state 
enum was extended to cover this state.
API-wise we consider this a compatible change, but the observed behavior 
may be different.


Metrics are another example; I believe over time we changed what some 
metrics returned a few times.


Re: AsyncFunction vs Async Sink

2023-06-15 Thread Teoh, Hong
Hi Lu,

> 1. Is there any problem if we use Async Function for such a user case? We can 
> simply drop the output and use Unordered mode.


As far as I can tell, it is similar, other than the retry strategy available 
for AsyncFunctions and batching for Async Sink. Both should work on Flink.


> 2. For AsyncFunction and  Async Sink. does it make sense that both could 
> share the same underlying implementation and the features like batching and 
> rate limiting can benefit both?

Good question - I think there are quite a lot of similarities, that’s why the 
interface is similar. However, I think the end use-case is different. For 
example, AsyncSink might want to implement support for some form of 
2phase-commit on Sink (at least once guarantee). This is slightly more 
complicated on AsyncFunction.



Regards,
Hong



On 15 Jun 2023, at 00:26, Lu Niu  wrote:


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Thanks, Hong!

I understand that if the user case is to simply write sth to an external 
service, Async Sink is a good option that provides features like batching, 
state management and rate limiting. I have some follow up questions:

1. Is there any problem if we use Async Function for such a user case? We can 
simply drop the output and use Unordered mode.
2. For AsyncFunction and  Async Sink. does it make sense that both could share 
the same underlying implementation and the features like batching and rate 
limiting can benefit both?

Best
Lu


On Wed, Jun 14, 2023 at 2:20 PM Teoh, Hong 
mailto:lian...@amazon.co.uk>> wrote:
Hi Lu,

Thanks for your question. See below for my understanding.

I would recommend using the Async Sink if you are writing to the external 
service as the final output of your job graph, and if you don’t have the 
ordered requirement that updates to the external system must be done before 
updates to some other external system within the same job graph. (More 
explained later).

The abstraction of the Async Sink is a sink, meaning it is a terminal operator 
in the job graph. The abstraction is intended to simplify the writing of a sink 
- meaning the base implementation will handle batching, state management and 
rate limiting. You only need to provide the client and request structure to be 
used to interact with the external service. This makes writing and maintaining 
the sink easier (if you simply want to write to a destination with at least 
once processing).

The AsyncFunction, as I understand it is more used for data enrichment, and is 
not a terminal operator in the job graph. This means the return value from the 
external service will continue to be passed on down the Flink Job graph. This 
is useful for data enrichment using the external service, or if we want to 
ensure the system being called in the AsyncFunction is updated BEFORE any data 
is written to the sinks further down the job graph.

For example:

Kinesis Source -> Map -> AsyncFunction (Updates DynamoDB) -> Kinesis Sink

We can be sure that the updates to DynamoDB for a particular record happens 
before the record is written to the Kinesis Sink.


Hope the above clarifies your question!

Regards,
Hong


On 14 Jun 2023, at 19:27, Lu Niu mailto:qqib...@gmail.com>> 
wrote:


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.



Hi, Flink dev and users

If I want to async write to an external service, which API shall I use, 
AsyncFunction or Async Sink?

My understanding after checking the code are:

  1.  Both APIs guarantee at least once write to external service. As both API 
internally stores in-flight requests in the checkpoint.
  2.  Async Sink provides a batching request feature. This can be implemented 
with Map + AsyncFunction. Map function groups requests in batches and pass it 
to AsyncFunction.The batching implementation can refer to 
AbstractMapBundleOperator if don’t want to use state.
  3.  Async Sink supports retry on failed requests. AsyncFunction also supports 
retry in latest flink version.
  4.  Async Sink supports rate limiting, AsyncFunction doesn’t.
  5.  AsyncFunction can be used to implement read-update-write. Async Sink 
cannot.

Best

Lu




[jira] [Created] (FLINK-32359) AdaptiveSchedulerBuilder shoudl accept executor service in constructor

2023-06-15 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32359:


 Summary: AdaptiveSchedulerBuilder shoudl accept executor service 
in constructor
 Key: FLINK-32359
 URL: https://issues.apache.org/jira/browse/FLINK-32359
 Project: Flink
  Issue Type: Technical Debt
  Components: Tests
Reporter: Chesnay Schepler
 Fix For: 1.18.0


The ASBuilder currently accepts mandatory arguments in both the constructor and 
final {{build()}} method.
This makes it difficult to create composite helper factory methods, since you 
always need to pass a special value in build(), usually leaking details of the 
test setup.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-321: Introduce an API deprecation process

2023-06-15 Thread Becket Qin
Hi Xintong,

I think the key of stability guarantees is about not breaking the
> commitments we made to users. From my understanding, when we mark an API as
> Public in for example 1.18, we make the following two commitments.
> a) The API will not be removed in all the 1.x serials.
> b) The API will be kept at least 2 minor releases after being marked
> Deprecated.

There is one more thing we need to follow which is not breaking the
conventions. Public API is a well defined common concept, and one of its
convention is that it only changes with a major version change. While it is
fine for us to define our own graduation path of an API like Experimental
=> PublicEvolving => Public, once an API reaches the Public API state, we
need to follow the common convention. Not every project has Experimental /
PublicEvolving APIs, even for those that do have similar annotations, I am
not aware of any demotion process. So the demotion or API stability is
something extra we put on our users, which is specific to Flink and breaks
the Public API convention.

As far as I understand, the reason for the demotion of a Public API is to
allow us to remove it without a major version bump. But it should be
completely OK to bump up the major version if we really want to get rid of
a public API, right? Isn't it just a version change which has almost no
additional cost to us? As an example, Kafka deprecated the old consumer in
0.11, and kept for 1.0 and 1.1, then the community bumped up the major
version to 2.0 and removed the code. So there are only two 1.x minor
versions, while 2.x has 8 minor versions and 3.x has 5 minor versions at
present.

I think the first priority about API stability is to make the end users
happy. Sometimes this does mean there will be maintenance overhead for us
as Flink maintainers. When there is a conflict and no way around, having
some trade-off is reasonable. However, in this particular case, there seems
no material benefit of having a stability demotion process while it does
weaken the user experience.

Thanks,

Jiangjie (Becket) Qin



On Thu, Jun 15, 2023 at 7:31 PM Martijn Visser 
wrote:

> Hi all,
>
> First off, thanks for opening the discussion on this topic. I think it's an
> important one.
>
> From my perspective, I think that one of the major pain points that users
> have expressed over time, is the API stability of Flink overall. I think
> that every Flink user wants to have reliable, stable and good APIs, and
> that this should be an essential thing for the Flink community to achieve.
> I think there's still a lot of room to improve here. I think that Spark has
> a nice overview of considerations when to break an API [1]. What I really
> like and would emphasize is that they strive to avoid breaking APIs or
> silently changing behaviour. I think we should do the same for the Flink
> APIs.
>
> Overall, I would agree with the proposal regarding deprecation for both
> Experimental and PublicEvolving. I do think that we can only enforce this
> if we actually enforce the opposite as well (which is the promotion from
> Experimental -> PublicEvolving -> Public).
>
> What I don't think is a good principle, is the example where a Public API
> is deprecated in 1.20 and the next release is 2.0 with the requirement to
> keep it there until the next major release. I can see why it's being
> proposed (because it would avoid that the user needs to change their
> implementation towards a new API), the user is already faced with the
> situation that their implementation must be changed, given the fact that
> the community decided to go for a new major version. That already implies
> breaking changes and impact for the user. I think it's the primary reason
> why there is no Flink 2.0 yet.
>
> I'm not in favour of downgrading APIs from Public -> PublicEvolving ->
> Experimental. When doing that, we are breaking the contract with the Flink
> users who believe they are on an API that won't break, only to figure out a
> couple of releases later that this has actually happened.
>
> I believe we should treat API Stability as a first class citizen, so that
> each API is annotated (either Internal, else it's considered public with an
> annotation of either Experimental, PublicEvolving or Public) and users know
> how to rely on them. An accepted deprecation proposal will only help our
> users in understanding the guarantees that they can expect.
>
> Best regards,
>
> Martijn
>
> [1] https://spark.apache.org/versioning-policy.html
>
> On Thu, Jun 15, 2023 at 5:29 AM Xintong Song 
> wrote:
>
> > I agree that Public APIs should require a longer migration period. I
> think
> > that is why the FLIP requires at least 2 minor releases (compared to 1
> > minor release for PublicEvolving and 1 patch release for Experimental).
> >
> > I think the key of stability guarantees is about not breaking the
> > commitments we made to users. From my understanding, when we mark an API
> as
> > Public in for example 1.18, we make the 

[jira] [Created] (FLINK-32358) CI may unintentionally use fallback akka loader

2023-06-15 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32358:


 Summary: CI may unintentionally use fallback akka loader
 Key: FLINK-32358
 URL: https://issues.apache.org/jira/browse/FLINK-32358
 Project: Flink
  Issue Type: Technical Debt
  Components: Build System / CI
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0


We have a fallback akka loader for developer convenience in the IDE, that is on 
the classpath of most modules. Depending on the order of jars on the classpath 
it can happen that the fallback loader appears first, which we dont want 
because it slows down the build and creates noisy logs.

We can add a simple prioritization scheme to the rpc system loading to remedy 
that.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-294: Support Customized Catalog Modification Listener

2023-06-15 Thread Feng Jin
+1(no-binding)

Best,
Feng


On Thu, Jun 15, 2023 at 8:33 PM liu ron  wrote:

> +1(no-binding)
>
> Best,
> Ron
>
> yuxia  于2023年6月15日周四 20:15写道:
>
> > +1 (binding)
> >
> > Best regards,
> > Yuxia
> >
> > - 原始邮件 -
> > 发件人: "Leonard Xu" 
> > 收件人: "dev" 
> > 发送时间: 星期四, 2023年 6 月 15日 下午 8:09:00
> > 主题: Re: [VOTE] FLIP-294: Support Customized Catalog Modification Listener
> >
> > +1 (binding)
> >
> >
> > Best,
> > Leonard
> >
> > > On Jun 14, 2023, at 11:21 PM, Jing Ge 
> > wrote:
> > >
> > > +1 (binding)
> > >
> > > Best Regards,
> > > Jing
> > >
> > > On Wed, Jun 14, 2023 at 4:07 PM Benchao Li 
> wrote:
> > >
> > >> +1 (binding)
> > >>
> > >> Shammon FY  于2023年6月14日周三 19:52写道:
> > >>
> > >>> Hi all:
> > >>>
> > >>> Thanks for all the feedback for FLIP-294: Support Customized Catalog
> > >>> Modification Listener [1]. I would like to start a vote for it
> > according
> > >> to
> > >>> the discussion in thread [2].
> > >>>
> > >>> The vote will be open for at least 72 hours(excluding weekends, until
> > >> June
> > >>> 19, 19:00 PM GMT) unless there is an objection or an insufficient
> > number
> > >> of
> > >>> votes.
> > >>>
> > >>>
> > >>> [1]
> > >>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Catalog+Modification+Listener
> > >>> [2] https://lists.apache.org/thread/185mbcwnpokfop4xcb22r9bgfp2m68fx
> > >>>
> > >>>
> > >>> Best,
> > >>> Shammon FY
> > >>>
> > >>
> > >>
> > >> --
> > >>
> > >> Best,
> > >> Benchao Li
> > >>
> >
>


Re: [VOTE] FLIP-294: Support Customized Catalog Modification Listener

2023-06-15 Thread liu ron
+1(no-binding)

Best,
Ron

yuxia  于2023年6月15日周四 20:15写道:

> +1 (binding)
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Leonard Xu" 
> 收件人: "dev" 
> 发送时间: 星期四, 2023年 6 月 15日 下午 8:09:00
> 主题: Re: [VOTE] FLIP-294: Support Customized Catalog Modification Listener
>
> +1 (binding)
>
>
> Best,
> Leonard
>
> > On Jun 14, 2023, at 11:21 PM, Jing Ge 
> wrote:
> >
> > +1 (binding)
> >
> > Best Regards,
> > Jing
> >
> > On Wed, Jun 14, 2023 at 4:07 PM Benchao Li  wrote:
> >
> >> +1 (binding)
> >>
> >> Shammon FY  于2023年6月14日周三 19:52写道:
> >>
> >>> Hi all:
> >>>
> >>> Thanks for all the feedback for FLIP-294: Support Customized Catalog
> >>> Modification Listener [1]. I would like to start a vote for it
> according
> >> to
> >>> the discussion in thread [2].
> >>>
> >>> The vote will be open for at least 72 hours(excluding weekends, until
> >> June
> >>> 19, 19:00 PM GMT) unless there is an objection or an insufficient
> number
> >> of
> >>> votes.
> >>>
> >>>
> >>> [1]
> >>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Catalog+Modification+Listener
> >>> [2] https://lists.apache.org/thread/185mbcwnpokfop4xcb22r9bgfp2m68fx
> >>>
> >>>
> >>> Best,
> >>> Shammon FY
> >>>
> >>
> >>
> >> --
> >>
> >> Best,
> >> Benchao Li
> >>
>


Re: [DISCUSS] Status of Statefun Project

2023-06-15 Thread Martijn Visser
Let me know if you have a PR for a Flink update :)

On Thu, Jun 8, 2023 at 5:52 PM Galen Warren via user 
wrote:

> Thanks Martijn.
>
> Personally, I'm already using a local fork of Statefun that is compatible
> with Flink 1.16.x, so I wouldn't have any need for a released version
> compatible with 1.15.x. I'd be happy to do the PRs to modify Statefun to
> work with new versions of Flink as they come along.
>
> As for testing, Statefun does have unit tests and Gordon also sent me
> instructions a while back for how to do some additional smoke tests which
> are pretty straightforward. Perhaps he could weigh in on whether the
> combination of automated tests plus those smoke tests should be sufficient
> for testing with new Flink versions (I believe the answer is yes).
>
> -- Galen
>
>
>
> On Thu, Jun 8, 2023 at 8:01 AM Martijn Visser 
> wrote:
>
>> Hi all,
>>
>> Apologies for the late reply.
>>
>> I'm willing to help out with merging requests in Statefun to keep them
>> compatible with new Flink releases and create new releases. I do think
>> that
>> validation of the functionality of these releases depends a lot on those
>> who do these compatibility updates, with PMC members helping out with the
>> formal process.
>>
>> > Why can't the Apache Software Foundation allow community members to
>> bring
>> it up to date?
>>
>> There's nothing preventing anyone from reviewing any of the current PRs or
>> opening new ones. However, none of them are approved [1], so there's also
>> nothing to merge.
>>
>> > I believe that there are people and companies on this mailing list
>> interested in supporting Apache Flink Stateful Functions.
>>
>> If so, then now is the time to show.
>>
>> Would there be a preference to create a release with Galen's merged
>> compatibility update to Flink 1.15.2, or do we want to skip that and go
>> straight to a newer version?
>>
>> Best regards,
>>
>> Martijn
>>
>> [1]
>>
>> https://github.com/apache/flink-statefun/pulls?q=is%3Apr+is%3Aopen+review%3Aapproved
>>
>> On Tue, Jun 6, 2023 at 3:55 PM Marco Villalobos <
>> mvillalo...@kineteque.com>
>> wrote:
>>
>> > Why can't the Apache Software Foundation allow community members to
>> bring
>> > it up to date?
>> >
>> > What's the process for that?
>> >
>> > I believe that there are people and companies on this mailing list
>> > interested in supporting Apache Flink Stateful Functions.
>> >
>> > You already had two people on this thread express interest.
>> >
>> > At the very least, we could keep the library versions up to date.
>> >
>> > There are only a small list of new features that might be worthwhile:
>> >
>> > 1. event time processing
>> > 2. state rest api
>> >
>> >
>> > On Jun 6, 2023, at 3:06 AM, Chesnay Schepler 
>> wrote:
>> >
>> > If you were to fork it *and want to redistribute it* then the short
>> > version is that
>> >
>> >1. you have to adhere to the Apache licensing requirements
>> >2. you have to make it clear that your fork does not belong to the
>> >Apache Flink project. (Trademarks and all that)
>> >
>> > Neither should be significant hurdles (there should also be plenty of
>> > online resources regarding 1), and if you do this then you can freely
>> share
>> > your fork with others.
>> >
>> > I've also pinged Martijn to take a look at this thread.
>> > To my knowledge the project hasn't decided anything yet.
>> >
>> > On 27/05/2023 04:05, Galen Warren wrote:
>> >
>> > Ok, I get it. No interest.
>> >
>> > If this project is being abandoned, I guess I'll work with my own fork.
>> Is
>> > there anything I should consider here? Can I share it with other people
>> who
>> > use this project?
>> >
>> > On Tue, May 16, 2023 at 10:50 AM Galen Warren 
>> 
>> > wrote:
>> >
>> >
>> > Hi Martijn, since you opened this discussion thread, I'm curious what
>> your
>> > thoughts are in light of the responses? Thanks.
>> >
>> > On Wed, Apr 19, 2023 at 1:21 PM Galen Warren 
>> 
>> > wrote:
>> >
>> >
>> > I use Apache Flink for stream processing, and StateFun as a hand-off
>> >
>> > point for the rest of the application.
>> > It serves well as a bridge between a Flink Streaming job and
>> > micro-services.
>> >
>> > This is essentially how I use it as well, and I would also be sad to see
>> > it sunsetted. It works well; I don't know that there is a lot of new
>> > development required, but if there are no new Statefun releases, then
>> > Statefun can only be used with older Flink versions.
>> >
>> > On Tue, Apr 18, 2023 at 10:04 PM Marco Villalobos <
>> mvillalo...@kineteque.com> wrote:
>> >
>> >
>> > I am currently using Stateful Functions in my application.
>> >
>> > I use Apache Flink for stream processing, and StateFun as a hand-off
>> > point for the rest of the application.
>> > It serves well as a bridge between a Flink Streaming job and
>> > micro-services.
>> >
>> > I would be disappointed if StateFun was sunsetted.  Its a good idea.
>> >
>> > If there is anything I can do to help, as a contributor perhaps, please

[DISCUSS] FLIP-303: Support REPLACE TABLE AS SELECT statement

2023-06-15 Thread yuxia
Hi, devs. 
As the FLIPs FLIP-218[1] & FLIP-305[2] for Flink to supports CREATE TABLE AS 
SELECT statement has been accepted. 
I would like to start a discussion about FLIP-303: Support REPLACE TABLE AS 
SELECT+statement[3] to complete such kinds of statements. 
With REPLACE TABLE AS SELECT statement, users won't need to drop the table 
firstly, and use CREATE TABLE AS SELECT then. Since the statement is much 
similar to CREATE TABLE AS statement, the design is much similar to FLIP-218[1] 
& FLIP-305[2] apart from some parts specified to REPLACE TABLE AS SELECT 
statement. 
Just kindly remind, to understand this FLIP better, you may need read 
FLIP-218[1] & FLIP-305[2] to get more context. 

Look forward to your feedback. 

[1]: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199541185 
[2]: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
 
[3]: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-303%3A+Support+REPLACE+TABLE+AS+SELECT+statement
 

:) just notice I miss "[DISCUSS]" in the title of the previous email [4], so I 
send it again here with the correct email title. Please ignore the previous 
email and discuss in this thread. 
Sorry for the noise. 

[4]: https://lists.apache.org/thread/jy39xwxn1o2035y5411xynwtbyfgg76t 


Best regards, 
Yuxia 


Re: [VOTE] FLIP-294: Support Customized Catalog Modification Listener

2023-06-15 Thread yuxia
+1 (binding)

Best regards,
Yuxia

- 原始邮件 -
发件人: "Leonard Xu" 
收件人: "dev" 
发送时间: 星期四, 2023年 6 月 15日 下午 8:09:00
主题: Re: [VOTE] FLIP-294: Support Customized Catalog Modification Listener

+1 (binding)


Best,
Leonard

> On Jun 14, 2023, at 11:21 PM, Jing Ge  wrote:
> 
> +1 (binding)
> 
> Best Regards,
> Jing
> 
> On Wed, Jun 14, 2023 at 4:07 PM Benchao Li  wrote:
> 
>> +1 (binding)
>> 
>> Shammon FY  于2023年6月14日周三 19:52写道:
>> 
>>> Hi all:
>>> 
>>> Thanks for all the feedback for FLIP-294: Support Customized Catalog
>>> Modification Listener [1]. I would like to start a vote for it according
>> to
>>> the discussion in thread [2].
>>> 
>>> The vote will be open for at least 72 hours(excluding weekends, until
>> June
>>> 19, 19:00 PM GMT) unless there is an objection or an insufficient number
>> of
>>> votes.
>>> 
>>> 
>>> [1]
>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Catalog+Modification+Listener
>>> [2] https://lists.apache.org/thread/185mbcwnpokfop4xcb22r9bgfp2m68fx
>>> 
>>> 
>>> Best,
>>> Shammon FY
>>> 
>> 
>> 
>> --
>> 
>> Best,
>> Benchao Li
>>


Re: [VOTE] FLIP-294: Support Customized Catalog Modification Listener

2023-06-15 Thread Leonard Xu
 
+1 (binding)


Best,
Leonard

> On Jun 14, 2023, at 11:21 PM, Jing Ge  wrote:
> 
> +1 (binding)
> 
> Best Regards,
> Jing
> 
> On Wed, Jun 14, 2023 at 4:07 PM Benchao Li  wrote:
> 
>> +1 (binding)
>> 
>> Shammon FY  于2023年6月14日周三 19:52写道:
>> 
>>> Hi all:
>>> 
>>> Thanks for all the feedback for FLIP-294: Support Customized Catalog
>>> Modification Listener [1]. I would like to start a vote for it according
>> to
>>> the discussion in thread [2].
>>> 
>>> The vote will be open for at least 72 hours(excluding weekends, until
>> June
>>> 19, 19:00 PM GMT) unless there is an objection or an insufficient number
>> of
>>> votes.
>>> 
>>> 
>>> [1]
>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Catalog+Modification+Listener
>>> [2] https://lists.apache.org/thread/185mbcwnpokfop4xcb22r9bgfp2m68fx
>>> 
>>> 
>>> Best,
>>> Shammon FY
>>> 
>> 
>> 
>> --
>> 
>> Best,
>> Benchao Li
>> 



Re: [ANNOUNCE] Updates to Flink's external connector CI workflows

2023-06-15 Thread Leonard Xu
Thanks Martijn for the great work !


Best,
Leonard



[jira] [Created] (FLINK-32356) Add document for calling procedure

2023-06-15 Thread luoyuxia (Jira)
luoyuxia created FLINK-32356:


 Summary: Add document for calling procedure
 Key: FLINK-32356
 URL: https://issues.apache.org/jira/browse/FLINK-32356
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: luoyuxia






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32357) Elasticsearch v3.0 won't compile when testing against Flink 1.17.1

2023-06-15 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-32357:
--

 Summary: Elasticsearch v3.0 won't compile when testing against 
Flink 1.17.1
 Key: FLINK-32357
 URL: https://issues.apache.org/jira/browse/FLINK-32357
 Project: Flink
  Issue Type: Bug
  Components: Connectors / ElasticSearch
Reporter: Martijn Visser


{code:java|
[INFO] 
Error:  Failed to execute goal 
org.apache.maven.plugins:maven-surefire-plugin:3.0.0-M5:test (default-test) on 
project flink-connector-elasticsearch-base: Execution default-test of goal 
org.apache.maven.plugins:maven-surefire-plugin:3.0.0-M5:test failed: 
org.junit.platform.commons.JUnitException: TestEngine with ID 'archunit' failed 
to discover tests: 
com.tngtech.archunit.lang.syntax.elements.MethodsThat.areAnnotatedWith(Ljava/lang/Class;)Ljava/lang/Object;
 -> [Help 1]
{code}

https://github.com/apache/flink-connector-elasticsearch/actions/runs/5277721611/jobs/9546112876#step:13:159





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32355) Support to list procedure

2023-06-15 Thread luoyuxia (Jira)
luoyuxia created FLINK-32355:


 Summary: Support to list procedure
 Key: FLINK-32355
 URL: https://issues.apache.org/jira/browse/FLINK-32355
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: luoyuxia






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32353) Make Cassandra connector compatible with Flink 1.18

2023-06-15 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-32353:
--

 Summary: Make Cassandra connector compatible with Flink 1.18
 Key: FLINK-32353
 URL: https://issues.apache.org/jira/browse/FLINK-32353
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Cassandra
Reporter: Martijn Visser


The current Cassandra connector in {{main}} fails when testing against Flink 
1.18-SNAPSHOT

{code:java}
Error:  Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 8.1 s 
<<< FAILURE! - in org.apache.flink.architecture.rules.ITCaseRules
Error:  ITCaseRules.ITCASE_USE_MINICLUSTER  Time elapsed: 0.025 s  <<< FAILURE!
java.lang.AssertionError: 
Architecture Violation [Priority: MEDIUM] - Rule 'ITCASE tests should use a 
MiniCluster resource or extension' was violated (1 times):
org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase does 
not satisfy: only one of the following predicates match:
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that 
are static, final, and of type InternalMiniClusterExtension and annotated with 
@RegisterExtension
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields 
that are static, final, and of type MiniClusterExtension and annotated with 
@RegisterExtension or are , and of type MiniClusterTestEnvironment and 
annotated with @TestEnv
* reside in a package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class InternalMiniClusterExtension
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class MiniClusterExtension
 or contain any fields that are public, static, and of type 
MiniClusterWithClientResource and final and annotated with @ClassRule or 
contain any fields that is of type MiniClusterWithClientResource and public and 
final and not static and annotated with @Rule
{code}

https://github.com/apache/flink-connector-cassandra/actions/runs/5276835802/jobs/9544092571#step:13:811



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32354) Support to execute the call procedure operation

2023-06-15 Thread luoyuxia (Jira)
luoyuxia created FLINK-32354:


 Summary: Support to execute the call procedure operation
 Key: FLINK-32354
 URL: https://issues.apache.org/jira/browse/FLINK-32354
 Project: Flink
  Issue Type: Sub-task
Reporter: luoyuxia






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32352) Support convert call procedure statement to correpsonding operation

2023-06-15 Thread luoyuxia (Jira)
luoyuxia created FLINK-32352:


 Summary: Support convert call procedure statement to correpsonding 
operation
 Key: FLINK-32352
 URL: https://issues.apache.org/jira/browse/FLINK-32352
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: luoyuxia






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32351) Introduce base interfaces for call procedure

2023-06-15 Thread luoyuxia (Jira)
luoyuxia created FLINK-32351:


 Summary: Introduce base interfaces for call procedure
 Key: FLINK-32351
 URL: https://issues.apache.org/jira/browse/FLINK-32351
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: luoyuxia






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[VOTE] Release flink-connector-jdbc v3.1.1, release candidate #2

2023-06-15 Thread Martijn Visser
Hi everyone,
Please review and vote on the release candidate #2 for the version 3.1.1,
as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)


The complete staging area is available for your review, which includes:
* JIRA release notes [1],
* the official Apache source release to be deployed to dist.apache.org [2],
which are signed with the key with fingerprint
A5F3BCE4CBE993573EC5966A65321B8382B219AF [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag v3.1.1-rc2 [5],
* website pull request listing the new release [6].

The vote will be open for at least 72 hours. It is adopted by majority
approval, with at least 3 PMC affirmative votes.

Thanks,
Release Manager

[1]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353281
[2]
https://dist.apache.org/repos/dist/dev/flink/flink-connector-jdbc-3.1.1-rc2
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4] https://repository.apache.org/content/repositories/orgapacheflink-1642
[5] https://github.com/apache/flink-connector-
/releases/tag/v3.1.1-rc2
[6] https://github.com/apache/flink-web/pull/654


[jira] [Created] (FLINK-32350) FLIP-311: Support Call Stored Procedure

2023-06-15 Thread luoyuxia (Jira)
luoyuxia created FLINK-32350:


 Summary: FLIP-311: Support Call Stored Procedure
 Key: FLINK-32350
 URL: https://issues.apache.org/jira/browse/FLINK-32350
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Reporter: luoyuxia
Assignee: luoyuxia


Umbrella issue for 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-311%3A+Support+Call+Stored+Procedure



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-15 Thread Lijie Wang
Hi,  Benchao and Aitozi,

Thanks for your feedback about this FLIP.

@Benchao

>> I think it would be reasonable to also support "pipeline shuffle" if
possible.
As I said above, runtime filter can work well with all shuffle mode,
including pipeline shuffle.

>> if the RuntimeFIlterBuilder could be done quickly than RuntimeFilter
operator, it can still filter out additional data afterwards.
I think the main purpose of runtime filter is to reduce the shuffle data
and the data arriving at join. Although eagerly running the large
table side can process datas in advance, most of the data may be
irrelevant, causing huge shuffle overhead and slowing the join. In
addition, if the join is a hash-join, the probe side of the hash-join also
needs to wait for its build side to complete, so the large table side is
likely to be back-pressed.
In addition, I don't tend to add too many configuration options in the
first version, which may make it more difficult to use (users need to
understand a lot of internal implementation details). Maybe it could be a
future improvement (if it's worthwhile)?


@Aitozi

>> IMO, In the current implementation two source table operators will be
executed simultaneously.
The example in FLIP uses blocking shuffle(I will add this point to FLIP).
The runtime filter is generally chained with the large table side to reduce
the shuffle data (as shown in Figure 2 of FLIP). The job vertices should be
scheduled in topological order, so the large table side can only be
scheduled after the RuntimeFilterBuilder finishes.

>> Are there some tests to show the default value of
table.optimizer.runtime-filter.min-probe-data-size 10G is a good default
value.
It's not tested yet, but it will be done before merge the code. The current
value refers to systems such as spark and hive. Before code merging, we
will test on TPC-DS 10 T to find an optimal set of values. If you have
relevant experience on it, welcome to give some suggestions.

>> What's the representation of the runtime filter node in planner ?
As shown in Figure 1 of FLIP, we intend to add two new physical nodes,
RuntimeFilterBuilder and RuntimeFilter.

Best,
Lijie

Aitozi  于2023年6月15日周四 15:52写道:

> Hi Lijie,
>
> Nice to see this valuable feature. After reading the FLIP I have some
> questions below:
>
> >Schedule the TableSource(dim) first.
>
> How does it know to schedule the TableSource(dim) first ? IMO, In the
> current implementation two source table operators will be executed
> simultaneously.
>
> >If the data volume on the probe side is too small, the overhead of
> building runtime filter is not worth it.
>
> Are there some tests to show the default value of
> table.optimizer.runtime-filter.min-probe-data-size 10G is a good default
> value. The same to table.optimizer.runtime-filter.max-build-data-size
>
> >the runtime filter can be pushed down along the probe side, as close to
> data sources as possible
>
> What's the representation of the runtime filter node in planner ? Is it a
> Filternode
>
> Best,
>
> Aitozi.
>
> Benchao Li  于2023年6月15日周四 14:30写道:
>
> > Hi Lijie,
> >
> > Regarding the shuffle mode, I think it would be reasonable to also
> support
> > "pipeline shuffle" if possible.
> >
> > "pipeline shuffle" is a essential for OLAP/MPP computing, although this
> has
> > not been much exposed to users for now, I know a few companies that uses
> > Flink as a MPP computing engine, and there is an ongoing effort[1] to
> make
> > this usage more powerful.
> >
> > Back to your concern that "Even if the RuntimeFilter becomes running
> before
> > the RuntimeFilterBuilder finished, it will not process any data and will
> > occupy resources", whether it benefits us depends on the scale of data,
> if
> > the RuntimeFIlterBuilder could be done quickly than RuntimeFilter
> operator,
> > it can still filter out additional data afterwards. Hence in my opinion,
> we
> > do not need to make the edge between RuntimeFilterBuilder and
> RuntimeFilter
> > BLOCKING only, at least it can be configured.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-25318
> >
> > Lijie Wang  于2023年6月15日周四 14:18写道:
> >
> > > Hi Yuxia,
> > >
> > > I made a mistake in the above response.
> > >
> > > The runtime filter can work well with all shuffle mode. However, hybrid
> > > shuffle and blocking shuffle are currently recommended for batch jobs
> > > (piepline shuffle is not recommended).
> > >
> > > One more thing to mention here is that we will force the edge between
> > > RuntimeFilterBuilder and RuntimeFilter to be BLOCKING(regardless of
> which
> > > BatchShuffleMode is set). Because the RuntimeFilter really doesn’t need
> > to
> > > run before the RuntimeFilterBuilder finished. Even if the RuntimeFilter
> > > becomes running before the RuntimeFilterBuilder finished, it will not
> > > process any data and will occupy resources.
> > >
> > > Best,
> > > Lijie
> > >
> > > Lijie Wang  于2023年6月15日周四 09:48写道:
> > >
> > > > Hi Yuxia,
> > > >
> > > > Thanks for your 

Re: [DISCUSS] FLIP-322 Cooldown period for adaptive scheduler

2023-06-15 Thread Robert Metzger
Thanks for the FLIP.

Some comments:
1. Can you specify the full proposed configuration name? "
scaling-cooldown-period" is probably not the full config name?
2. Why is the concept of scaling events and a scaling queue needed? If I
remember correctly, the adaptive scheduler will just check how many
TaskManagers are available and then adjust the execution graph accordingly.
There's no need to store a number of scaling events. We just need to
determine the time to trigger an adjustment of the execution graph.
3. What's the behavior wrt to JobManager failures (e.g. we lose the state
of the Adaptive Scheduler?). My proposal would be to just reset the
cooldown period, so after recovery of a JobManager, we have to wait at
least for the cooldown period until further scaling operations are done.
4. What's the relationship to the
"jobmanager.adaptive-scheduler.resource-stabilization-timeout"
configuration?

Thanks a lot for working on this!

Best,
Robert

On Wed, Jun 14, 2023 at 3:38 PM Etienne Chauchot 
wrote:

> Hi all,
>
> @Yukia,I updated the FLIP to include the aggregation of the staked
> operations that we discussed below PTAL.
>
> Best
>
> Etienne
>
>
> Le 13/06/2023 à 16:31, Etienne Chauchot a écrit :
> > Hi Yuxia,
> >
> > Thanks for your feedback. The number of potentially stacked operations
> > depends on the configured length of the cooldown period.
> >
> >
> >
> > The proposition in the FLIP is to add a minimum delay between 2 scaling
> > operations. But, indeed, an optimization could be to still stack the
> > operations (that arrive during a cooldown period) but maybe not take
> > only the last operation but rather aggregate them in order to end up
> > with a single aggregated operation when the cooldown period ends. For
> > example, let's say 3 taskManagers come up and 1 comes down during the
> > cooldown period, we could generate a single operation of scale up +2
> > when the period ends.
> >
> > As a side note regarding your comment on "it'll take a long time to
> > finish all", please keep in mind that the reactive mode (at least for
> > now) is only available for streaming pipeline which are in essence
> > infinite processing.
> >
> > Another side note: when you mention "every taskManagers connecting",
> > if you are referring to the start of the pipeline, please keep in mind
> > that the adaptive scheduler has a "waiting for resources" timeout
> > period before starting the pipeline in which all taskmanagers connect
> > and the parallelism is decided.
> >
> > Best
> >
> > Etienne
> >
> > Le 13/06/2023 à 03:58, yuxia a écrit :
> >> Hi, Etienne. Thanks for driving it. I have one question about the
> >> mechanism of the cooldown timeout.
> >>
> >> From the Proposed Changes part, if a scalling event is received and
> >> it falls during the cooldown period, it'll be stacked to be executed
> >> after the period ends. Also, from the description of FLINK-21883[1],
> >> cooldown timeout is to avoid rescaling the job very frequently,
> >> because TaskManagers are not all connecting at the same time.
> >>
> >> So, is it possible that every taskmanager connecting will produce a
> >> scalling event and it'll be stacked with many scale up event which
> >> causes it'll take a long time to finish all? Can we just take the
> >> last one event?
> >>
> >> [1]: https://issues.apache.org/jira/browse/FLINK-21883
> >>
> >> Best regards, Yuxia
> >>
> >> - 原始邮件 - 发件人: "Etienne Chauchot" 
> >> 收件人:
> >> "dev" , "Robert Metzger" 
> >> 发送时间: 星期一, 2023年 6 月 12日 下午 11:34:25 主题: [DISCUSS] FLIP-322
> >> Cooldown
> >> period for adaptive scheduler
> >>
> >> Hi,
> >>
> >> I’d like to start a discussion about FLIP-322 [1] which introduces a
> >> cooldown period for the adaptive scheduler.
> >>
> >> I'd like to get your feedback especially @Robert as you opened the
> >> related ticket and worked on the reactive mode a lot.
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-322+Cooldown+period+for+adaptive+scheduler
> >>
> >>
> >>
> > Best
> >>
> >> Etienne


FLIP-303: Support REPLACE TABLE AS SELECT statement

2023-06-15 Thread yuxia
Hi, devs. 
As the FLIPs FLIP-218[1] & FLIP-305[2] for Flink to supports CREATE TABLE AS 
SELECT statement has been accepted. 
I would like to start a discussion about FLIP-303: Support REPLACE TABLE AS 
SELECT+statement[3] to complete such kinds of statements. 
With REPLACE TABLE AS SELECT statement, users won't need to drop the table 
firstly, and use CREATE TABLE AS SELECT then. Since the statement is much 
similar to CREATE TABLE AS statement, the design is much similar to FLIP-218[1] 
& FLIP-305[2] apart from some parts specified to REPLACE TABLE AS SELECT 
statement. 
Just kindly remind, to understand this FLIP better, you may need read 
FLIP-218[1] & FLIP-305[2] to get more context. 

Look forward to your feedback. 

[1]: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199541185 
[2]: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
 
[3]: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-303%3A+Support+REPLACE+TABLE+AS+SELECT+statement
 

Best regards, 
Yuxia 



Re: [VOTE] FLIP-311: Support Call Stored Procedure

2023-06-15 Thread yuxia
Hi, all.
FLIP-311 [1] has been accepted.
There are 5 binding votes, 3 non-binding votes:
- Benchao Li (binding)
- Rui Fan (binding)
- Jingsong Li (binding)
- Lincoln (binding)
- Jing Ge (binding)


- Mang Zhang (non-binding)
- Feng Jin (non-binding)
- Jane Chan (non-binding)

Thanks everyone.

[1]: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-311%3A+Support+Call+Stored+Procedure


Best regards,
Yuxia

- 原始邮件 -
发件人: "Jane Chan" 
收件人: "dev" 
发送时间: 星期四, 2023年 6 月 15日 上午 11:11:33
主题: Re: Re: [VOTE] FLIP-311: Support Call Stored Procedure

+1 (non-binding)

Best,
Jane

On Wed, Jun 14, 2023 at 10:41 AM Feng Jin  wrote:

> +1 (no-binding)
>
>
> Best,
> Feng
>
>
> On Wed, Jun 14, 2023 at 7:02 AM Jing Ge 
> wrote:
>
> > +1(binding)
> >
> > Best Regards,
> > Jing
> >
> > On Tue, Jun 13, 2023 at 9:03 AM Mang Zhang  wrote:
> >
> > > +1 (no-binding)
> > >
> > >
> > >
> > >
> > > --
> > >
> > > Best regards,
> > > Mang Zhang
> > >
> > >
> > >
> > >
> > >
> > > 在 2023-06-13 13:19:31,"Lincoln Lee"  写道:
> > > >+1 (binding)
> > > >
> > > >Best,
> > > >Lincoln Lee
> > > >
> > > >
> > > >Jingsong Li  于2023年6月13日周二 10:07写道:
> > > >
> > > >> +1
> > > >>
> > > >> On Mon, Jun 12, 2023 at 10:32 PM Rui Fan <1996fan...@gmail.com>
> > wrote:
> > > >> >
> > > >> > +1 (binding)
> > > >> >
> > > >> > Best,
> > > >> > Rui Fan
> > > >> >
> > > >> > On Mon, Jun 12, 2023 at 22:20 Benchao Li 
> > > wrote:
> > > >> >
> > > >> > > +1 (binding)
> > > >> > >
> > > >> > > yuxia  于2023年6月12日周一 17:58写道:
> > > >> > >
> > > >> > > > Hi everyone,
> > > >> > > > Thanks for all the feedback about FLIP-311: Support Call
> Stored
> > > >> > > > Procedure[1]. Based on the discussion [2], we have come to a
> > > >> consensus,
> > > >> > > so
> > > >> > > > I would like to start a vote.
> > > >> > > > The vote will be open for at least 72 hours (until June 15th,
> > > 10:00AM
> > > >> > > GMT)
> > > >> > > > unless there is an objection or an insufficient number of
> votes.
> > > >> > > >
> > > >> > > >
> > > >> > > > [1]
> > > >> > > >
> > > >> > >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-311%3A+Support+Call+Stored+Procedure
> > > >> > > > [2]
> > > https://lists.apache.org/thread/k6s50gcgznon9v1oylyh396gb5kgrwmd
> > > >> > > >
> > > >> > > > Best regards,
> > > >> > > > Yuxia
> > > >> > > >
> > > >> > >
> > > >> > >
> > > >> > > --
> > > >> > >
> > > >> > > Best,
> > > >> > > Benchao Li
> > > >> > >
> > > >>
> > >
> >
>


Re: [DISCUSS] FLIP-321: Introduce an API deprecation process

2023-06-15 Thread Martijn Visser
Hi all,

First off, thanks for opening the discussion on this topic. I think it's an
important one.

>From my perspective, I think that one of the major pain points that users
have expressed over time, is the API stability of Flink overall. I think
that every Flink user wants to have reliable, stable and good APIs, and
that this should be an essential thing for the Flink community to achieve.
I think there's still a lot of room to improve here. I think that Spark has
a nice overview of considerations when to break an API [1]. What I really
like and would emphasize is that they strive to avoid breaking APIs or
silently changing behaviour. I think we should do the same for the Flink
APIs.

Overall, I would agree with the proposal regarding deprecation for both
Experimental and PublicEvolving. I do think that we can only enforce this
if we actually enforce the opposite as well (which is the promotion from
Experimental -> PublicEvolving -> Public).

What I don't think is a good principle, is the example where a Public API
is deprecated in 1.20 and the next release is 2.0 with the requirement to
keep it there until the next major release. I can see why it's being
proposed (because it would avoid that the user needs to change their
implementation towards a new API), the user is already faced with the
situation that their implementation must be changed, given the fact that
the community decided to go for a new major version. That already implies
breaking changes and impact for the user. I think it's the primary reason
why there is no Flink 2.0 yet.

I'm not in favour of downgrading APIs from Public -> PublicEvolving ->
Experimental. When doing that, we are breaking the contract with the Flink
users who believe they are on an API that won't break, only to figure out a
couple of releases later that this has actually happened.

I believe we should treat API Stability as a first class citizen, so that
each API is annotated (either Internal, else it's considered public with an
annotation of either Experimental, PublicEvolving or Public) and users know
how to rely on them. An accepted deprecation proposal will only help our
users in understanding the guarantees that they can expect.

Best regards,

Martijn

[1] https://spark.apache.org/versioning-policy.html

On Thu, Jun 15, 2023 at 5:29 AM Xintong Song  wrote:

> I agree that Public APIs should require a longer migration period. I think
> that is why the FLIP requires at least 2 minor releases (compared to 1
> minor release for PublicEvolving and 1 patch release for Experimental).
>
> I think the key of stability guarantees is about not breaking the
> commitments we made to users. From my understanding, when we mark an API as
> Public in for example 1.18, we make the following two commitments.
> a) The API will not be removed in all the 1.x serials.
> b) The API will be kept at least 2 minor releases after being marked
> Deprecated.
>
> When there's a major release bump before 2 minor releases, a) is not
> affected and, and we should only need to guarantee b) by keeping in the new
> major release. This is the rationale behind my proposal.
>
> I think my proposal can provide the same migration experience for users as
> if the major release bump has not happened. It should also not affect users
> planning their usage of FlInk if we tell them at the beginning of the new
> major release that the deprecated API will soon be removed.
>
> Best,
>
> Xintong
>
>
>
> On Wed, Jun 14, 2023 at 10:01 PM Becket Qin  wrote:
>
> > Thanks for the explanation, Matthias.
> >
> > In the example you raised, would it be better to just keep both YARN and
> > K8S support in the new major version, but with YARN support deprecated if
> > we want to? We can say for YARN we will only provide bug fixes but no
> > feature development anymore. Given these two features are probably in two
> > independent modules. Keeping both modules in the same new major version
> > likely has zero additional cost compared with maintaining them in two
> > different major versions respectively. This way we don't have the
> > non-linear version issue, have fewer releases, and save a bunch of
> > maintenance effort for multiple development branches.
> >
> > Regarding the stability demotion, I see your point. However, I still feel
> > it a little weird that we demote a Public API to PublicEvolving just for
> > the purpose of code removal in minor versions. This also results in some
> > counter intuitive issues. For example, assuming users only use Public
> APIs,
> > they may be able to upgrade from 1.19.0 to 2.0 fine, but upgrading from
> > 1.19 to 2.1 does not work because the Public API is removed, even though
> > from the users' perspective, both of them are major version upgrades. So,
> > in this case, I would rather bump up the major version again to remove
> the
> > deprecated Public API. That seems simpler and does not complicate the
> well
> > established versioning semantic conventions.
> >
> > Thanks,
> >
> > 

Re: [ANNOUNCE] Updates to Flink's external connector CI workflows

2023-06-15 Thread Ahmed Hamdy
Thanks Martijn
Great improvements for the connector repo stability.
Best regards
Ahmed Hamdy

On Thu, 15 Jun 2023, 12:21 Martijn Visser,  wrote:

> Big thanks to Sergey Nuyanzin for his suggestion on the JVM thread dumps :)
>
> On Thu, Jun 15, 2023 at 1:19 PM Martijn Visser 
> wrote:
>
> > Hi all,
> >
> > I would like to inform you of two changes that have been made to the
> > shared CI workflow that's used for Flink's externalized connectors.
> >
> > 1. Up until now, weekly builds were running to validate that connector
> > code (still) works with Flink. However, these builds were only running
> for
> > code on the "main" branch of the connector, and not for the branches of
> the
> > connector (like v3.0 for Elasticsearch, v1.0 for Opensearch etc). This
> was
> > tracked under https://issues.apache.org/jira/browse/FLINK-31923.
> >
> > That issue has now been fixed, with the Github Action workflow now
> > accepting a map with arrays, which can contain a combination of Flink
> > versions to test for and the connector branch it should test. See
> >
> https://github.com/apache/flink-connector-jdbc/blob/main/.github/workflows/weekly.yml#L28-L47
> > for an example on the Flink JDBC connector
> >
> > This change has already been applied on the externalized connectors GCP
> > PubSub, RabbitMQ, JDBC, Pulsar, MongoDB, Opensearch, Cassandra,
> > Elasticsearch. AWS is pending the merging of the PR. For Kafka, Hive and
> > HBase, since they haven't finished externalization, this isn't applicable
> > to them yet.
> >
> > 2. When working on the debugging of a problem with the JDBC connector,
> one
> > of the things that was needed to debug that problem was the ability to
> see
> > the JVM thread dump. With
> > https://issues.apache.org/jira/browse/FLINK-32331 now completed, every
> > failed CI run will have a JVM thread dump. You can see the implementation
> > for that in
> >
> https://github.com/apache/flink-connector-shared-utils/blob/ci_utils/.github/workflows/ci.yml#L161-L195
> >
> > Best regards,
> >
> > Martijn
> >
>


Re: [VOTE] Release flink-connector-jdbc v3.1.1, release candidate #1

2023-06-15 Thread Martijn Visser
Hi all,

Apologies for the long delay on this one, but with the help of Sergey
Nuyanzin and Joao Boto we managed to resolve this issue. To summarize:

1. The vote for this connector is cancelled, due to
https://issues.apache.org/jira/browse/FLINK-31770
2. That Jira ticket has now been resolved
3. When doing testing, we noticed flaky results and filed
https://issues.apache.org/jira/browse/FLINK-32325. It seems that Surefire
forks and MSSQL server images are not fans of each other, but we mitigated
the issue by setting the forkCount to 1. For a permanent solution we have
https://issues.apache.org/jira/browse/FLINK-32342 to track.

I'll create a new JDBC 3.1.1 release candidate.

Best regards,

Martijn

On Tue, Jun 6, 2023 at 11:37 AM Chesnay Schepler  wrote:

> I'm a bit concerned that the last 4 CI runs haven't succeeded in the 3.1
> branch.
>
> Has anyone looked into the failing oracle test (both 1.17/1.18)?
>
> https://github.com/apache/flink-connector-jdbc/actions/runs/5058372107/jobs/9078398092
>
> Why is a vote being opened when there's still a blocker ticket for this
> very test failure? (FLINK-31770)
>
> How can it be that CI is broken for 2 months and no one notices?
>
> On 24/05/2023 20:54, Martijn Visser wrote:
> > Hi everyone,
> > Please review and vote on the release candidate #1 for the version 3.1.1,
> > as follows:
> > [ ] +1, Approve the release
> > [ ] -1, Do not approve the release (please provide specific comments)
> >
> >
> > The complete staging area is available for your review, which includes:
> > * JIRA release notes [1],
> > * the official Apache source release to be deployed to dist.apache.org
> [2],
> > which are signed with the key with fingerprint
> > A5F3BCE4CBE993573EC5966A65321B8382B219AF [3],
> > * all artifacts to be deployed to the Maven Central Repository [4],
> > * source code tag v3.1.1-rc1 [5],
> > * website pull request listing the new release [6].
> >
> > The vote will be open for at least 72 hours. It is adopted by majority
> > approval, with at least 3 PMC affirmative votes.
> >
> > Thanks,
> > Release Manager
> >
> > [1]
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353281
> > [2]
> >
> https://dist.apache.org/repos/dist/dev/flink/flink-connector-jdbc-3.1.1-rc1
> > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > [4]
> https://repository.apache.org/content/repositories/orgapacheflink-1636/
> > [5]
> https://github.com/apache/flink-connector-jdbc/releases/tag/v3.1.1-rc1
> > [6] https://github.com/apache/flink-web/pull/654
> >
>
>


Re: [ANNOUNCE] Updates to Flink's external connector CI workflows

2023-06-15 Thread Martijn Visser
Big thanks to Sergey Nuyanzin for his suggestion on the JVM thread dumps :)

On Thu, Jun 15, 2023 at 1:19 PM Martijn Visser 
wrote:

> Hi all,
>
> I would like to inform you of two changes that have been made to the
> shared CI workflow that's used for Flink's externalized connectors.
>
> 1. Up until now, weekly builds were running to validate that connector
> code (still) works with Flink. However, these builds were only running for
> code on the "main" branch of the connector, and not for the branches of the
> connector (like v3.0 for Elasticsearch, v1.0 for Opensearch etc). This was
> tracked under https://issues.apache.org/jira/browse/FLINK-31923.
>
> That issue has now been fixed, with the Github Action workflow now
> accepting a map with arrays, which can contain a combination of Flink
> versions to test for and the connector branch it should test. See
> https://github.com/apache/flink-connector-jdbc/blob/main/.github/workflows/weekly.yml#L28-L47
> for an example on the Flink JDBC connector
>
> This change has already been applied on the externalized connectors GCP
> PubSub, RabbitMQ, JDBC, Pulsar, MongoDB, Opensearch, Cassandra,
> Elasticsearch. AWS is pending the merging of the PR. For Kafka, Hive and
> HBase, since they haven't finished externalization, this isn't applicable
> to them yet.
>
> 2. When working on the debugging of a problem with the JDBC connector, one
> of the things that was needed to debug that problem was the ability to see
> the JVM thread dump. With
> https://issues.apache.org/jira/browse/FLINK-32331 now completed, every
> failed CI run will have a JVM thread dump. You can see the implementation
> for that in
> https://github.com/apache/flink-connector-shared-utils/blob/ci_utils/.github/workflows/ci.yml#L161-L195
>
> Best regards,
>
> Martijn
>


[ANNOUNCE] Updates to Flink's external connector CI workflows

2023-06-15 Thread Martijn Visser
Hi all,

I would like to inform you of two changes that have been made to the shared
CI workflow that's used for Flink's externalized connectors.

1. Up until now, weekly builds were running to validate that connector code
(still) works with Flink. However, these builds were only running for code
on the "main" branch of the connector, and not for the branches of the
connector (like v3.0 for Elasticsearch, v1.0 for Opensearch etc). This was
tracked under https://issues.apache.org/jira/browse/FLINK-31923.

That issue has now been fixed, with the Github Action workflow now
accepting a map with arrays, which can contain a combination of Flink
versions to test for and the connector branch it should test. See
https://github.com/apache/flink-connector-jdbc/blob/main/.github/workflows/weekly.yml#L28-L47
for an example on the Flink JDBC connector

This change has already been applied on the externalized connectors GCP
PubSub, RabbitMQ, JDBC, Pulsar, MongoDB, Opensearch, Cassandra,
Elasticsearch. AWS is pending the merging of the PR. For Kafka, Hive and
HBase, since they haven't finished externalization, this isn't applicable
to them yet.

2. When working on the debugging of a problem with the JDBC connector, one
of the things that was needed to debug that problem was the ability to see
the JVM thread dump. With https://issues.apache.org/jira/browse/FLINK-32331
now completed, every failed CI run will have a JVM thread dump. You can see
the implementation for that in
https://github.com/apache/flink-connector-shared-utils/blob/ci_utils/.github/workflows/ci.yml#L161-L195

Best regards,

Martijn


Re: [VOTE] FLIP-295: Support lazy initialization of catalogs and persistence of catalog configurations

2023-06-15 Thread Shammon FY
Thanks Feng for the FLIP.

+1(non-binding)

Best,
Shammon FY

On Thu, Jun 15, 2023 at 6:30 PM Leonard Xu  wrote:

> Thanks Feng for driving this FLIP forward.
>
> +1(binding)
>
> Best,
> Leonard
>
> > On Jun 15, 2023, at 1:42 PM, Dong Lin  wrote:
> >
> > Thanks Feng for the FLIP.
> >
> > +1(binding)
> >
> > Cheers,
> > Dong
> >
> > On Wed, Jun 14, 2023 at 10:35 AM Feng Jin  wrote:
> >
> >> Hi everyone
> >>
> >> Thanks for all the feedback about the FLIP-295: Support lazy
> initialization
> >> of catalogs and persistence of catalog configurations[1].
> >> [2] is the discussion thread.
> >>
> >>
> >> I'd like to start a vote for it. The vote will be open for at least 72
> >> hours(excluding weekends,until June 19, 10:00AM GMT) unless there is an
> >> objection or an insufficient number of votes.
> >>
> >>
> >> [1]
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
> >> [2]https://lists.apache.org/thread/dcwgv0gmngqt40fl3694km53pykocn5s
> >>
> >>
> >> Best,
> >> Feng
> >>
>
>


[jira] [Created] (FLINK-32349) Support atomic for CREATE TABLE AS SELECT(CTAS) statement

2023-06-15 Thread tartarus (Jira)
tartarus created FLINK-32349:


 Summary: Support atomic for CREATE TABLE AS SELECT(CTAS) statement
 Key: FLINK-32349
 URL: https://issues.apache.org/jira/browse/FLINK-32349
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Reporter: tartarus
 Fix For: 1.18.0


For detailed information, see FLIP-305

https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[RESULT][VOTE] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

2023-06-15 Thread Mang Zhang
Hi, everyone.


FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement [1] has 
been accepted.


There are 5 binding votes, 2 non-binding votes [2].
- yuxia(binding)
- Jing Ge(binding)
- Ron Liu(non-binding)
- Rui Fan(binding)
- Jingsong Li(binding)
- Lincoln Lee(binding)
- Mang Zhang(non-binding)


None against.
Thanks again for every one who concerns on this FLIP.
[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
[2] https://lists.apache.org/thread/8c0wlp72kq0dhcbpy08nl1kb28q17kv3

--

Best regards,
Mang Zhang

Re: [VOTE] FLIP-295: Support lazy initialization of catalogs and persistence of catalog configurations

2023-06-15 Thread Leonard Xu
Thanks Feng for driving this FLIP forward. 

+1(binding)

Best,
Leonard

> On Jun 15, 2023, at 1:42 PM, Dong Lin  wrote:
> 
> Thanks Feng for the FLIP.
> 
> +1(binding)
> 
> Cheers,
> Dong
> 
> On Wed, Jun 14, 2023 at 10:35 AM Feng Jin  wrote:
> 
>> Hi everyone
>> 
>> Thanks for all the feedback about the FLIP-295: Support lazy initialization
>> of catalogs and persistence of catalog configurations[1].
>> [2] is the discussion thread.
>> 
>> 
>> I'd like to start a vote for it. The vote will be open for at least 72
>> hours(excluding weekends,until June 19, 10:00AM GMT) unless there is an
>> objection or an insufficient number of votes.
>> 
>> 
>> [1]
>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
>> [2]https://lists.apache.org/thread/dcwgv0gmngqt40fl3694km53pykocn5s
>> 
>> 
>> Best,
>> Feng
>> 



[jira] [Created] (FLINK-32348) MongoDB tests are flaky and time out

2023-06-15 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-32348:
--

 Summary: MongoDB tests are flaky and time out
 Key: FLINK-32348
 URL: https://issues.apache.org/jira/browse/FLINK-32348
 Project: Flink
  Issue Type: Bug
  Components: Connectors / MongoDB
Reporter: Martijn Visser


https://github.com/apache/flink-connector-mongodb/actions/runs/5232649632/jobs/9447519651#step:13:39307



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32347) Exceptions from the CompletedCheckpointStore are not registered by the CheckpointFailureManager

2023-06-15 Thread Tigran Manasyan (Jira)
Tigran Manasyan created FLINK-32347:
---

 Summary: Exceptions from the CompletedCheckpointStore are not 
registered by the CheckpointFailureManager 
 Key: FLINK-32347
 URL: https://issues.apache.org/jira/browse/FLINK-32347
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.17.1, 1.16.2, 1.15.3
Reporter: Tigran Manasyan


Currently if an error occurs while saving a completed checkpoint in the 
{_}CompletedCheckpointStore{_}, _CheckpointCoordinator_ doesn't call 
_CheckpointFailureManager_ to handle the error. Such behavior leads to the 
fact, that errors from _CompletedCheckpointStore_ don't increase the failed 
checkpoints count and _'execution.checkpointing.tolerable-failed-checkpoints'_ 
option does not limit the number of errors of this kind in any way.

Possible solution may be to move the notification of _CheckpointFailureManager_ 
about successful checkpoint after storing completed checkpoint in the 
_CompletedCheckpointStore_ and providing the exception to the 
_CheckpointFailureManager_ in the 
{_}CheckpointCoordinator#{_}{_}[addCompletedCheckpointToStoreAndSubsumeOldest()|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1440]{_}
 method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32346) JdbcNumericBetweenParametersProvider Sharding key boundaries large storage long integer overflow, use BigDecimal instead Long

2023-06-15 Thread zhilinli (Jira)
zhilinli created FLINK-32346:


 Summary: JdbcNumericBetweenParametersProvider  Sharding key 
boundaries large storage long integer overflow, use BigDecimal instead Long
 Key: FLINK-32346
 URL: https://issues.apache.org/jira/browse/FLINK-32346
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC
Reporter: zhilinli
 Attachments: image-2023-06-15-16-42-16-773.png

Sharding key boundaries large storage long integer overflow, use BigDecimal 
instead Long, so that length types such as DecimalType(30,0) are compatible and 
LONG cannot be stored Can be assigned to me and I want to complete it  

!image-2023-06-15-16-45-44-721.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32345) Improve parallel download of RocksDB incremental state

2023-06-15 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-32345:
--

 Summary: Improve parallel download of RocksDB incremental state
 Key: FLINK-32345
 URL: https://issues.apache.org/jira/browse/FLINK-32345
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.19.0


{{RocksDBStateDownloader}} is used to download the files for incremental 
checkpoints in parallel. However, the parallelism is currently restricted to a 
single {{IncrementalRemoteKeyedStateHandle}} and also a single state type 
(shared, private) within the handle at a time.
We should support parallelization across multiple state types and across 
multiple state handles. In particular, this can improve our download times for 
scale-in.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32344) MongoDB connector support unbounded streaming read via ChangeStream feature

2023-06-15 Thread Jiabao Sun (Jira)
Jiabao Sun created FLINK-32344:
--

 Summary: MongoDB connector support unbounded streaming read via 
ChangeStream feature
 Key: FLINK-32344
 URL: https://issues.apache.org/jira/browse/FLINK-32344
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / MongoDB
Affects Versions: mongodb-1.0.1
Reporter: Jiabao Sun






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-15 Thread Aitozi
Hi Lijie,

Nice to see this valuable feature. After reading the FLIP I have some
questions below:

>Schedule the TableSource(dim) first.

How does it know to schedule the TableSource(dim) first ? IMO, In the
current implementation two source table operators will be executed
simultaneously.

>If the data volume on the probe side is too small, the overhead of
building runtime filter is not worth it.

Are there some tests to show the default value of
table.optimizer.runtime-filter.min-probe-data-size 10G is a good default
value. The same to table.optimizer.runtime-filter.max-build-data-size

>the runtime filter can be pushed down along the probe side, as close to
data sources as possible

What's the representation of the runtime filter node in planner ? Is it a
Filternode

Best,

Aitozi.

Benchao Li  于2023年6月15日周四 14:30写道:

> Hi Lijie,
>
> Regarding the shuffle mode, I think it would be reasonable to also support
> "pipeline shuffle" if possible.
>
> "pipeline shuffle" is a essential for OLAP/MPP computing, although this has
> not been much exposed to users for now, I know a few companies that uses
> Flink as a MPP computing engine, and there is an ongoing effort[1] to make
> this usage more powerful.
>
> Back to your concern that "Even if the RuntimeFilter becomes running before
> the RuntimeFilterBuilder finished, it will not process any data and will
> occupy resources", whether it benefits us depends on the scale of data, if
> the RuntimeFIlterBuilder could be done quickly than RuntimeFilter operator,
> it can still filter out additional data afterwards. Hence in my opinion, we
> do not need to make the edge between RuntimeFilterBuilder and RuntimeFilter
> BLOCKING only, at least it can be configured.
>
> [1] https://issues.apache.org/jira/browse/FLINK-25318
>
> Lijie Wang  于2023年6月15日周四 14:18写道:
>
> > Hi Yuxia,
> >
> > I made a mistake in the above response.
> >
> > The runtime filter can work well with all shuffle mode. However, hybrid
> > shuffle and blocking shuffle are currently recommended for batch jobs
> > (piepline shuffle is not recommended).
> >
> > One more thing to mention here is that we will force the edge between
> > RuntimeFilterBuilder and RuntimeFilter to be BLOCKING(regardless of which
> > BatchShuffleMode is set). Because the RuntimeFilter really doesn’t need
> to
> > run before the RuntimeFilterBuilder finished. Even if the RuntimeFilter
> > becomes running before the RuntimeFilterBuilder finished, it will not
> > process any data and will occupy resources.
> >
> > Best,
> > Lijie
> >
> > Lijie Wang  于2023年6月15日周四 09:48写道:
> >
> > > Hi Yuxia,
> > >
> > > Thanks for your feedback. The answers of your questions are as follows:
> > >
> > > 1. Yes, the row count comes from statistic of underlying table(Or
> > > estimated based on the statistic of underlying table, if the build side
> > or
> > > probe side is not TableScan).  If the statistic unavailable, we will
> not
> > > inject a runtime filter(As you said, we can hardly evaluate the
> > benefits).
> > > Besides, AFAIK, the estimated data size of build side is also based on
> > the
> > > row count statistics, that is, if the statistics is unavailable, the
> > > requirement "table.optimizer.runtime-filter.max-build-data-size" cannot
> > be
> > > evaluated either. I'll add this point into FLIP.
> > >
> > > 2.
> > > Estimated data size does not meet requirement (in planner optimization
> > > phase) -> No filter
> > > Estimated data size meets the requirement (in planner optimization
> > phase),
> > > but the real data size does not meet the requirement(in execution
> phase)
> > ->
> > > Fake filter
> > >
> > > 3. Yes, the runtime filter is only for batch jobs/blocking shuffle.
> > >
> > > Best,
> > > Lijie
> > >
> > > yuxia  于2023年6月14日周三 20:37写道:
> > >
> > >> Thanks Lijie for starting this discussion. Excited to see runtime
> filter
> > >> is to be implemented in Flink.
> > >> I have few questions about it:
> > >>
> > >> 1: As the FLIP said, `if the ndv cannot be estimated, use row count
> > >> instead`. So, does row count comes from the statistic from underlying
> > >> table? What if the the statistic is also unavailable considering users
> > >> maynot always remember to generate statistic in production.
> > >> I'm wondering whether it make senese that just disable runtime filter
> if
> > >> statistic is unavailable since in that case, we can hardly evaluate
> the
> > >> benefits of runtime-filter.
> > >>
> > >>
> > >> 2: The FLIP said: "We will inject the runtime filters only if the
> > >> following requirements are met:xxx", but it also said, "Once this
> limit
> > is
> > >> exceeded, it will output a fake filter(which always returns true)" in
> > >> `RuntimeFilterBuilderOperator` part; Seems they are contradictory, so
> > i'm
> > >> wondering what's the real behavior, no filter will be injected or fake
> > >> filter?
> > >>
> > >>
> > >> 3: Does it also mean runtime-filter can only take effect in blocking
> > >> shuffle?
> > >>

[jira] [Created] (FLINK-32342) SQL Server container behaves unexpected while testing with several surefire forks

2023-06-15 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-32342:
---

 Summary: SQL Server container behaves unexpected while testing 
with several surefire forks
 Key: FLINK-32342
 URL: https://issues.apache.org/jira/browse/FLINK-32342
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / JDBC
Affects Versions: jdbc-3.1.0
Reporter: Sergey Nuyanzin


By default it inherits {{flink.forkCountITCase == 2}} from Flink.
it looks sqlserver container has issues with starting in several surefire 
forks...
Based on 
[https://github.com/MartijnVisser/flink-connector-jdbc/actions/runs/5265349453/jobs/9517854060]
sql server container is hanging while start
{noformat}
"main" #1 prio=5 os_prio=0 cpu=1965.96ms elapsed=2568.93s 
tid=0x7f84a0027000 nid=0x1c82 runnable  [0x7f84a41fc000]
   java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(java.base@11.0.19/Native 
Method)
at 
java.net.SocketInputStream.socketRead(java.base@11.0.19/SocketInputStream.java:115)
at 
java.net.SocketInputStream.read(java.base@11.0.19/SocketInputStream.java:168)
at 
java.net.SocketInputStream.read(java.base@11.0.19/SocketInputStream.java:140)
at 
com.microsoft.sqlserver.jdbc.TDSChannel$ProxyInputStream.readInternal(IOBuffer.java:1192)
- locked <0x930e38f0> (a 
com.microsoft.sqlserver.jdbc.TDSChannel$ProxyInputStream)
at 
com.microsoft.sqlserver.jdbc.TDSChannel$ProxyInputStream.read(IOBuffer.java:1179)
at com.microsoft.sqlserver.jdbc.TDSChannel.read(IOBuffer.java:2307)
- locked <0x930e38f0> (a 
com.microsoft.sqlserver.jdbc.TDSChannel$ProxyInputStream)
at 
com.microsoft.sqlserver.jdbc.SQLServerConnection.Prelogin(SQLServerConnection.java:3391)
at 
com.microsoft.sqlserver.jdbc.SQLServerConnection.connectHelper(SQLServerConnection.java:3200)
at 
com.microsoft.sqlserver.jdbc.SQLServerConnection.login(SQLServerConnection.java:2833)
at 
com.microsoft.sqlserver.jdbc.SQLServerConnection.connectInternal(SQLServerConnection.java:2671)
at 
com.microsoft.sqlserver.jdbc.SQLServerConnection.connect(SQLServerConnection.java:1640)
at 
com.microsoft.sqlserver.jdbc.SQLServerDriver.connect(SQLServerDriver.java:936)
at 
org.testcontainers.containers.JdbcDatabaseContainer.createConnection(JdbcDatabaseContainer.java:253)
at 
org.testcontainers.containers.JdbcDatabaseContainer.createConnection(JdbcDatabaseContainer.java:218)
at 
org.testcontainers.containers.JdbcDatabaseContainer.waitUntilContainerStarted(JdbcDatabaseContainer.java:158)
at 
org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:490)
at 
org.testcontainers.containers.GenericContainer.lambda$doStart$0(GenericContainer.java:344)
at 
org.testcontainers.containers.GenericContainer$$Lambda$532/0x0001003d1440.call(Unknown
 Source)
at 
org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:81)
at 
org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:334)
at 
org.testcontainers.containers.GenericContainer.start(GenericContainer.java:322)
at 
org.apache.flink.connector.jdbc.testutils.databases.sqlserver.SqlServerDatabase$SqlServerContainer.start(SqlServerDatabase.java:81)
at 
org.apache.flink.connector.jdbc.testutils.databases.sqlserver.SqlServerDatabase.startDatabase(SqlServerDatabase.java:52)
at 
org.apache.flink.connector.jdbc.testutils.DatabaseExtension.beforeAll(DatabaseExtension.java:122)
...
{noformat}


as a WA setting {{flink.forkCountITCase == 1}} solves the issue
However need to find a better way to allow running tests with several forks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32343) Fix exception for jdbc tools

2023-06-15 Thread Fang Yong (Jira)
Fang Yong created FLINK-32343:
-

 Summary: Fix exception for jdbc tools
 Key: FLINK-32343
 URL: https://issues.apache.org/jira/browse/FLINK-32343
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / JDBC
Affects Versions: 1.18.0
Reporter: Fang Yong


Fix exception for jdbc tools



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-15 Thread Benchao Li
Hi Lijie,

Regarding the shuffle mode, I think it would be reasonable to also support
"pipeline shuffle" if possible.

"pipeline shuffle" is a essential for OLAP/MPP computing, although this has
not been much exposed to users for now, I know a few companies that uses
Flink as a MPP computing engine, and there is an ongoing effort[1] to make
this usage more powerful.

Back to your concern that "Even if the RuntimeFilter becomes running before
the RuntimeFilterBuilder finished, it will not process any data and will
occupy resources", whether it benefits us depends on the scale of data, if
the RuntimeFIlterBuilder could be done quickly than RuntimeFilter operator,
it can still filter out additional data afterwards. Hence in my opinion, we
do not need to make the edge between RuntimeFilterBuilder and RuntimeFilter
BLOCKING only, at least it can be configured.

[1] https://issues.apache.org/jira/browse/FLINK-25318

Lijie Wang  于2023年6月15日周四 14:18写道:

> Hi Yuxia,
>
> I made a mistake in the above response.
>
> The runtime filter can work well with all shuffle mode. However, hybrid
> shuffle and blocking shuffle are currently recommended for batch jobs
> (piepline shuffle is not recommended).
>
> One more thing to mention here is that we will force the edge between
> RuntimeFilterBuilder and RuntimeFilter to be BLOCKING(regardless of which
> BatchShuffleMode is set). Because the RuntimeFilter really doesn’t need to
> run before the RuntimeFilterBuilder finished. Even if the RuntimeFilter
> becomes running before the RuntimeFilterBuilder finished, it will not
> process any data and will occupy resources.
>
> Best,
> Lijie
>
> Lijie Wang  于2023年6月15日周四 09:48写道:
>
> > Hi Yuxia,
> >
> > Thanks for your feedback. The answers of your questions are as follows:
> >
> > 1. Yes, the row count comes from statistic of underlying table(Or
> > estimated based on the statistic of underlying table, if the build side
> or
> > probe side is not TableScan).  If the statistic unavailable, we will not
> > inject a runtime filter(As you said, we can hardly evaluate the
> benefits).
> > Besides, AFAIK, the estimated data size of build side is also based on
> the
> > row count statistics, that is, if the statistics is unavailable, the
> > requirement "table.optimizer.runtime-filter.max-build-data-size" cannot
> be
> > evaluated either. I'll add this point into FLIP.
> >
> > 2.
> > Estimated data size does not meet requirement (in planner optimization
> > phase) -> No filter
> > Estimated data size meets the requirement (in planner optimization
> phase),
> > but the real data size does not meet the requirement(in execution phase)
> ->
> > Fake filter
> >
> > 3. Yes, the runtime filter is only for batch jobs/blocking shuffle.
> >
> > Best,
> > Lijie
> >
> > yuxia  于2023年6月14日周三 20:37写道:
> >
> >> Thanks Lijie for starting this discussion. Excited to see runtime filter
> >> is to be implemented in Flink.
> >> I have few questions about it:
> >>
> >> 1: As the FLIP said, `if the ndv cannot be estimated, use row count
> >> instead`. So, does row count comes from the statistic from underlying
> >> table? What if the the statistic is also unavailable considering users
> >> maynot always remember to generate statistic in production.
> >> I'm wondering whether it make senese that just disable runtime filter if
> >> statistic is unavailable since in that case, we can hardly evaluate the
> >> benefits of runtime-filter.
> >>
> >>
> >> 2: The FLIP said: "We will inject the runtime filters only if the
> >> following requirements are met:xxx", but it also said, "Once this limit
> is
> >> exceeded, it will output a fake filter(which always returns true)" in
> >> `RuntimeFilterBuilderOperator` part; Seems they are contradictory, so
> i'm
> >> wondering what's the real behavior, no filter will be injected or fake
> >> filter?
> >>
> >>
> >> 3: Does it also mean runtime-filter can only take effect in blocking
> >> shuffle?
> >>
> >>
> >>
> >> Best regards,
> >> Yuxia
> >>
> >> - 原始邮件 -
> >> 发件人: "ron9 liu" 
> >> 收件人: "dev" 
> >> 发送时间: 星期三, 2023年 6 月 14日 下午 5:29:28
> >> 主题: Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch
> Jobs
> >>
> >> Thanks Lijie start this discussion. Runtime Filter is a common
> >> optimization
> >> to improve the join performance that has been adopted by many computing
> >> engines such as Spark, Doris, etc... Flink is a streaming batch
> computing
> >> engine, and we are continuously optimizing the performance of batches.
> >> Runtime filter is a general performance optimization technique that can
> >> improve the performance of Flink batch jobs, so we are introducing it on
> >> batch as well.
> >>
> >> Looking forward to all feedback.
> >>
> >> Best,
> >> Ron
> >>
> >> Lijie Wang  于2023年6月14日周三 17:17写道:
> >>
> >> > Hi devs
> >> >
> >> > Ron Liu, Gen Luo and I would like to start a discussion about
> FLIP-324:
> >> > Introduce Runtime Filter for Flink Batch Jobs[1]
> >> >
> >> > Runtime Filter is a common 

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-15 Thread Lijie Wang
Hi Yuxia,

I made a mistake in the above response.

The runtime filter can work well with all shuffle mode. However, hybrid
shuffle and blocking shuffle are currently recommended for batch jobs
(piepline shuffle is not recommended).

One more thing to mention here is that we will force the edge between
RuntimeFilterBuilder and RuntimeFilter to be BLOCKING(regardless of which
BatchShuffleMode is set). Because the RuntimeFilter really doesn’t need to
run before the RuntimeFilterBuilder finished. Even if the RuntimeFilter
becomes running before the RuntimeFilterBuilder finished, it will not
process any data and will occupy resources.

Best,
Lijie

Lijie Wang  于2023年6月15日周四 09:48写道:

> Hi Yuxia,
>
> Thanks for your feedback. The answers of your questions are as follows:
>
> 1. Yes, the row count comes from statistic of underlying table(Or
> estimated based on the statistic of underlying table, if the build side or
> probe side is not TableScan).  If the statistic unavailable, we will not
> inject a runtime filter(As you said, we can hardly evaluate the benefits).
> Besides, AFAIK, the estimated data size of build side is also based on the
> row count statistics, that is, if the statistics is unavailable, the
> requirement "table.optimizer.runtime-filter.max-build-data-size" cannot be
> evaluated either. I'll add this point into FLIP.
>
> 2.
> Estimated data size does not meet requirement (in planner optimization
> phase) -> No filter
> Estimated data size meets the requirement (in planner optimization phase),
> but the real data size does not meet the requirement(in execution phase) ->
> Fake filter
>
> 3. Yes, the runtime filter is only for batch jobs/blocking shuffle.
>
> Best,
> Lijie
>
> yuxia  于2023年6月14日周三 20:37写道:
>
>> Thanks Lijie for starting this discussion. Excited to see runtime filter
>> is to be implemented in Flink.
>> I have few questions about it:
>>
>> 1: As the FLIP said, `if the ndv cannot be estimated, use row count
>> instead`. So, does row count comes from the statistic from underlying
>> table? What if the the statistic is also unavailable considering users
>> maynot always remember to generate statistic in production.
>> I'm wondering whether it make senese that just disable runtime filter if
>> statistic is unavailable since in that case, we can hardly evaluate the
>> benefits of runtime-filter.
>>
>>
>> 2: The FLIP said: "We will inject the runtime filters only if the
>> following requirements are met:xxx", but it also said, "Once this limit is
>> exceeded, it will output a fake filter(which always returns true)" in
>> `RuntimeFilterBuilderOperator` part; Seems they are contradictory, so i'm
>> wondering what's the real behavior, no filter will be injected or fake
>> filter?
>>
>>
>> 3: Does it also mean runtime-filter can only take effect in blocking
>> shuffle?
>>
>>
>>
>> Best regards,
>> Yuxia
>>
>> - 原始邮件 -
>> 发件人: "ron9 liu" 
>> 收件人: "dev" 
>> 发送时间: 星期三, 2023年 6 月 14日 下午 5:29:28
>> 主题: Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs
>>
>> Thanks Lijie start this discussion. Runtime Filter is a common
>> optimization
>> to improve the join performance that has been adopted by many computing
>> engines such as Spark, Doris, etc... Flink is a streaming batch computing
>> engine, and we are continuously optimizing the performance of batches.
>> Runtime filter is a general performance optimization technique that can
>> improve the performance of Flink batch jobs, so we are introducing it on
>> batch as well.
>>
>> Looking forward to all feedback.
>>
>> Best,
>> Ron
>>
>> Lijie Wang  于2023年6月14日周三 17:17写道:
>>
>> > Hi devs
>> >
>> > Ron Liu, Gen Luo and I would like to start a discussion about FLIP-324:
>> > Introduce Runtime Filter for Flink Batch Jobs[1]
>> >
>> > Runtime Filter is a common optimization to improve join performance. It
>> is
>> > designed to dynamically generate filter conditions for certain Join
>> queries
>> > at runtime to reduce the amount of scanned or shuffled data, avoid
>> > unnecessary I/O and network transmission, and speed up the query. Its
>> > working principle is building a filter(e.g. bloom filter) based on the
>> data
>> > on the small table side(build side) first, then pass this filter to the
>> > large table side(probe side) to filter the irrelevant data on it, this
>> can
>> > reduce the data reaching the join and improve performance.
>> >
>> > You can find more details in the FLIP-324[1]. Looking forward to your
>> > feedback.
>> >
>> > [1]
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-324%3A+Introduce+Runtime+Filter+for+Flink+Batch+Jobs
>> >
>> > Best,
>> > Ron & Gen & Lijie
>> >
>>
>


Re: [DISCUSS] FLIP-246: Multi Cluster Kafka Source

2023-06-15 Thread Mason Chen
Hi Gordon,

Thanks for your feedback as always.

Why not just Map?

I think it makes sense to relocate the bootstrapServer field in the Kafka
properties (ClusterMetadata abstraction), since that is intuitive based
on how it is defined in the Kafka clients library. It also makes the
uniqueness of a cluster clear--it's not a combination of cluster name and
bootstrap server but rather just the cluster name. Bootstrap server(s) can
change over time based on Kafka design (exposing a few brokers or putting
all brokers behind a single entry point).

I'll solicit more feedback on the name in the voting thread, with the 3
options (DynamicKafkaSource, MultiClusterKafkaSource,
DiscoveringKafkaSource). I will start the thread tomorrow after minor
touches on the FLIP based on our last few exchanges!

Best,
Mason

On Wed, Jun 14, 2023 at 1:31 PM Tzu-Li (Gordon) Tai 
wrote:

> Hi Mason,
>
> Thanks for addressing my comments. I agree that option 3 seems more
> reasonable.
>
> > Reorganize the metadata in a Map in
> `KafkaStream` where the String is the proposed
> `KafkaClusterIdentifier.name` field.
>
> Why not just Map?
>
> Regarding naming, I like DynamicKafkaSource as that's what I immediately
> thought of when reading the FLIP, but I'm not married to the name :)
>
> In principle, it looks like the FLIP is in good shape and generally people
> seem to like the idea of having this connector in Flink.
> I'd be in favor of an official vote to allow this to move forward.
>
> Thanks,
> Gordon
>
> On Mon, Jun 12, 2023 at 1:57 PM Mason Chen  wrote:
>
> > >
> > > My main worry for doing this as a later iteration is that this would
> > > probably be a breaking change for the public interface. If that can be
> > > avoided and planned ahead, I'm fine with moving forward with how it is
> > > right now.
> >
> >
> > Make sense. Considering the public interfaces, I think we still want to
> > provide clients the ability to pin certain configurations in the
> > builder--however, cluster specific configurations may not be known
> upfront
> > or generalize to all clusters so there would need to be changes in the
> > `KafkaMetadataService` interface. This could be achieved by exposing via:
> >
> > 1. A separate API (e.g. `Map
> > getKafkaClusterProperties()`) in KafkaMetadataService
> > 2. In `KafkaClusterIdentifier` as this already contains some
> configuration
> > (e.g. Bootstrap server) in which case we should rename the class to
> > something like `KafkaCluster` as it is no longer just an identifier
> > 3. Reorganize the metadata in a Map in
> > `KafkaStream` where the String is the proposed
> > `KafkaClusterIdentifier.name` field.
> >
> > I am preferring option 3 since this simplifies equals() checks on
> > KafkaClusterIdentifier (e.g. is it the name, bootstrap, or both?).
> >
> > Small correction for the MultiClusterKafkaSourceEnumerator section: "This
> > > reader is responsible for discovering and assigning splits from 1+
> > cluster"
> >
> > Thanks for the catch!
> >
> > the defining characteristic is the dynamic discovery vs. the fact that
> > > multiple clusters [...]
> >
> >
> >
> > I think the "Table" in the name of those SQL connectors should avoid
> > > confusion. Perhaps we can also solicit other ideas? I would throw
> > > "DiscoveringKafkaSource" into the mix.
> >
> >  Agreed with Gordon's and your suggestions. Right, the only public facing
> > name for SQL is `kafka` for the SQL connector identifier. Based on your
> > suggestions:
> >
> > 1. MultiClusterKafkaSource
> > 2. DynamicKafkaSource
> > 3. DiscoveringKafkaSource
> > 4. MutableKafkaSource
> > 5. AdaptiveKafkaSource
> >
> > I added a few of my own. I do prefer 2. What do others think?
> >
> > Best,
> > Mason
> >
> > On Sun, Jun 11, 2023 at 1:12 PM Thomas Weise  wrote:
> >
> > > Hi Mason,
> > >
> > > Thanks for the iterations on the FLIP, I think this is in a very good
> > shape
> > > now.
> > >
> > > Small correction for the MultiClusterKafkaSourceEnumerator section:
> "This
> > > reader is responsible for discovering and assigning splits from 1+
> > cluster"
> > >
> > > Regarding the user facing name of the connector: I agree with Gordon
> that
> > > the defining characteristic is the dynamic discovery vs. the fact that
> > > multiple clusters may be consumed in parallel. (Although, as described
> in
> > > the FLIP, lossless consumer migration only works with a strategy that
> > > involves intermittent parallel consumption of old and new clusters to
> > drain
> > > and switch.)
> > >
> > > I think the "Table" in the name of those SQL connectors should avoid
> > > confusion. Perhaps we can also solicit other ideas? I would throw
> > > "DiscoveringKafkaSource" into the mix.
> > >
> > > Cheers,
> > > Thomas
> > >
> > >
> > >
> > >
> > > On Fri, Jun 9, 2023 at 3:40 PM Tzu-Li (Gordon) Tai <
> tzuli...@apache.org>
> > > wrote:
> > >
> > > > > Regarding (2), definitely. This is something we planned to add
> later
> > on
> > > > but
> > > > so far keeping things common