Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

2023-01-27 Thread Jark Wu
Thank Steven for the explanation.

It sounds good to me to implement the shuffle operator in the Iceberg
project first.
We can contribute it to Flink DataStream in the future if other
projects/connectors also need it.

Best,
Jark


On Wed, 18 Jan 2023 at 02:11, Steven Wu  wrote:

> Jark,
>
> We were planning to discard the proposal due to some valid concerns raised
> in the thread. Also, this proposal itself didn't really save too much code
> duplication (maybe 100 lines or so).
>
> I also thought that the shuffle operator for DataStream can be useful for
> other connectors too. The shuffling part (based on traffic statistics) can
> be generic for other connectors. There will be some small integration part
> unique to Iceberg, which can stay in Iceberg. If we go with this new
> direction, we would need a new FLIP.
>
> Thanks,
> Steven
>
>
>
> On Mon, Jan 16, 2023 at 12:30 AM Jark Wu  wrote:
>
>> What's the status and conclusion of this discussion?
>>
>> I have seen the value of exposing OperatorCoordinator because of the
>> powerful RPC calls,
>> some projects are already using it, such as Hudi[1]. But I agree this is
>> a large topic and
>> requires another FLIP.
>>
>> I am also concerned about extracting a Public base class without
>> implementations, and
>> clear usage is easy to break in the future. However, I think the
>> shuffling operator can be a
>> generic component used by other connectors and DataStream jobs.
>>
>> Have you considered contributing the ShuffleOperator to the Flink main
>> repository as a
>> part of DataStream API (e.g., DataStream#dynamicShuffle)? It's easy to
>> extract the common
>> part between SourceCoordinatorContext and ShuffleCoordinatorContext in a
>> single repository
>>  as an internal implementation.
>>
>>
>> Best,
>> Jark
>>
>> [1]:
>> https://github.com/apache/hudi/blob/a80bb4f717ad8a89770176a1238c4b08874044e8/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/OperatorCoordinatorAdapter.java
>>
>> On Thu, 3 Nov 2022 at 22:36, Piotr Nowojski  wrote:
>>
>>> Ohhh, I was confused. I thought that the proposal is to make
>>> `CoordinatorContextBase` part of the public API.
>>>
>>> However, I'm also against extracting `CoordinatorContextBase` as an
>>> `@Internal` class as well.
>>>
>>> 1. Connectors shouldn't reuse internal classes. Using `@Internal`
>>> CoordinatedOperatorFactory would be already quite bad, but at least this
>>> is
>>> a relatively stable internal API. Using `@Internal`
>>> `@CoordinatorContextBase`, and refactoring out this base class just for
>>> the
>>> sake of re-using it in a connector is IMO even worse.
>>> 2. Double so if they are in a separate repository (as the iceberg
>>> connector
>>> will be/is, right?). There would be no way to prevent breaking changes
>>> between repositories.
>>>
>>> If that's only intended as the stop-gap solution until we properly expose
>>> coordinators, the lesser evil would be IMO to copy/paste/modify
>>> SourceCoordinatorContext to the flink-connector-iceberg repository.
>>>
>>> Best,
>>> Piotrek
>>>
>>> czw., 3 lis 2022 o 12:51 Maximilian Michels  napisał(a):
>>>
>>> > +1 If we wanted to expose the OperatorCoordinator API, we should
>>> provide
>>> > an adequate interface. The FLIP partially addresses this by trying to
>>> > factor out RPC code which other coordinators might make use of, but
>>> there
>>> > is additional design necessary to realize a public operator API.
>>> >
>>> > Just to be clear, I'm not opposed to any of the changes in the FLIP. I
>>> > think they make sense in the context of an Iceberg ShuffleCoordinator
>>> in
>>> > Flink. If we were to add such a new coordinator, feel free to make the
>>> > proposed code refactoring as part of a pull request. A FLIP isn't
>>> strictly
>>> > necessary here because this is a purely internal change which does not
>>> > alter public APIs, nor does it alter the internal architecture, apart
>>> from
>>> > reusing a bit of existing code. I'm sorry if we consumed some of your
>>> time
>>> > revising the document but I think we had a healthy discussion here. And
>>> > we're definitely looking forward to seeing some of these code changes!
>>> >
>>> > -Max
>>> >
>>> > On Thu, Nov 3, 2022 at 11:56 AM Piotr Nowojski 
>>> > wrote:
>>> >
>>> >> Hi,
>>> >>
>>> >> Sorry for the delay, but I've given more thoughts into this. First I
>>> >> share the same thoughts as Maximilian, that this FLIP is incomplete.
>>> As I
>>> >> understand it, you are trying to hack existing code to expose small
>>> bits of
>>> >> internal functionalities as part of the public API without solving
>>> many of
>>> >> the underlying issues.
>>> >>
>>> >> For example, what's the point of exposing `CoordinatorContextBase` as
>>> a
>>> >> public API if users can not use it? After all, the
>>> `OperatorCoordinator`
>>> >> and `CoordinatedOperatorFactory` would remain internal. At the same
>>> time,
>>> >> this FLIP would officially force us to support and maintain this
>>> >> 

Re: [Discuss] Conventions on driving FLIPs towards consensus

2023-01-27 Thread Jark Wu
Thank Xintong for starting the discussion in PMC and bringing the summary
here.

The summary looks good to me. Unresponsive reviewing has happened numerous
times in the previous FLIPs. I think the conventions are very valuable for
the community
to move forward efficiently and avoid potential conflicts in the future.

Best,
Jark

On Sat, 28 Jan 2023 at 12:26, Xintong Song  wrote:

> Hi devs,
>
> Recently, a discussion about how to drive FLIPs towards consensus has taken
> place on the private@ mailing list. While many PMC members have already
> shared their opinions, we believe for this topic the opinions of other
> committers / contributors are equally important. Therefore, we are moving
> the discussion to the dev@ mailing list for a wilder involvement.
>
> ### Background
>
> Flink Improvement Proposal (FLIP) [1], the process for proposing,
> discussing, reviewing and voting on major changes to Flink, plays an
> important role in driving the project forward. According to the process,
> *consensus*[2] is required for any proposal to be accepted. This means
> objections from any single committer can block the process. It has been
> observed many times that a FLIP is long blocked on a disapproving
> committer. In most cases, this is necessary for addressing technical
> concerns. However, there are also cases that after raising concerns the
> committer becomes unresponsive (due to other works, personal vacation
> plans, etc.), leaving the FLIP blocked for an unnecessarily long time.
>
> The purpose of this discussion is to come up with some conventions on
> preventing FLIPs from being long blocked on unresponsive reviewers.
>
> ### Summary of the previous discussion on private@
>
>- Most people agree that the progress of a FLIP should not be long
>blocked on an unresponsive reviewer. When a reviewer blocks the
> progress of
>a FLIP, he/she should be responsive to the subsequent replies, or at
> least
>provide a reasonable estimated time of response.
>- As for how long we should wait for the responses, there’s a tendency
>towards relying on the judgement of individuals while also having a
>typically recommended time (1 week).
>- Committers should use their veto rights with care. Per the ASF policy
>[3], vetos must be provided with a technical justification showing why
> the
>change is bad. They should not be used for simply blocking the process
> so
>the voter has more time to catch up.
>- We’d also encourage the FLIP proposers to actively reach out to the
>interested parties (e.g., previous contributors of the relevant part)
>early. It helps expose and address the potential concerns early and also
>leaves more time for other parties to respond while the proposer works
> on
>the FLIP.
>- We’d like to work on trust rather than heavy rules and processes. All
>above, if agreed, should be conventions among the community. We would
> not
>formally change the FLIP process.
>
>
> Looking forward to your opinions.
>
>
> Best,
>
> Xintong
>
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws#FlinkBylaws-Approvals
>
> [3] https://www.apache.org/foundation/voting.html#Veto
>


[jira] [Created] (FLINK-30810) Rework the CliClientITCase to extend AbstractSqlGatewayStatementITCase

2023-01-27 Thread Shengkai Fang (Jira)
Shengkai Fang created FLINK-30810:
-

 Summary: Rework the CliClientITCase to extend 
AbstractSqlGatewayStatementITCase
 Key: FLINK-30810
 URL: https://issues.apache.org/jira/browse/FLINK-30810
 Project: Flink
  Issue Type: Sub-task
Reporter: Shengkai Fang


We should always use the AbstractSqlGatewayStatementITCase to cover the 
statement tests in the sql-client/sql-gateway. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30809) flink-connector-elasticsearch7 updates data pipeline does not work

2023-01-27 Thread iduanyingjie (Jira)
iduanyingjie created FLINK-30809:


 Summary: flink-connector-elasticsearch7 updates data pipeline does 
not work
 Key: FLINK-30809
 URL: https://issues.apache.org/jira/browse/FLINK-30809
 Project: Flink
  Issue Type: Bug
  Components: Connectors / ElasticSearch
Affects Versions: elasticsearch-3.0.0
 Environment: Flink Version: 1.15.3
Mysql Version: 5.7
Elasticsearch Version: 7.17.7
Reporter: iduanyingjie


create elasticsearch in docker
{code:yaml}
version: '2.1'
services:
 elasticsearch:
   image: docker.elastic.co/elasticsearch/elasticsearch:7.17.7
   environment:
 - cluster.name=docker-cluster
 - bootstrap.memory_lock=true
 - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
 - discovery.type=single-node
   ports:
 - "9200:9200"
 - "9300:9300"
   ulimits:
 memlock:
   soft: -1
   hard: -1
 nofile:
   soft: 65536
   hard: 65536
 kibana:
   image: docker.elastic.co/kibana/kibana:7.17.7
   ports:
 - "5601:5601"
{code}
create table: records in mysql
{code:sql}
CREATE TABLE records (
 id bigint unsigned NOT NULL AUTO_INCREMENT,
 user_id bigint unsigned NOT NULL,
 create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
 PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
{code}
insert some datas
{code:sql}
INSERT INTO test.records (id, user_id, create_time) VALUES(default, 123, 
'2023-01-20 12:25:11');
INSERT INTO test.records (id, user_id, create_time) VALUES(default, 456, 
'2023-01-20 12:25:30');
INSERT INTO test.records (id, user_id, create_time) VALUES(default, 789, 
'2023-01-20 12:25:37');
{code}
create pipeline in es:
{code:java}
PUT /_ingest/pipeline/set_ingest_timestamp_fields
{
 "processors": [
   {
 "set": {
   "field": "ingest_timestamp",
   "value": "{{_ingest.timestamp}}"
 }
   }
 ]
}{code}
create index in es:
{code:java}
PUT enriched_records
{
 "settings": {
   "default_pipeline": "set_ingest_timestamp_fields",
   "number_of_shards": "1",
   "number_of_replicas": "0"
 }
}{code}
excute flink sql:
{code:sql}
CREATE TABLE records (
   id INT,
   user_id INT,
   create_time TIMESTAMP(3),
   proc_time AS PROCTIME(),
   operation_time TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'mysql-cdc',
   'hostname' = 'localhost',
   'port' = '3306',
   'username' = 'root',
   'password' = '123456',
   'database-name' = 'test',
   'table-name' = 'records',
   'server-time-zone' = 'UTC'
);
CREATE TABLE enriched_records (
   id INT,
   user_id INT,
   create_time TIMESTAMP(3),
   proc_time TIMESTAMP_LTZ(3),
   operation_time TIMESTAMP_LTZ(3),
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'elasticsearch-7',
   'hosts' = 'http://localhost:9200',
   'index' = 'enriched_records'
);
INSERT INTO enriched_records
SELECT
   o.id,
   o.user_id,
   o.create_time,
   o.proc_time,
   o.operation_time
FROM records AS o; 
{code}
We query the data in Elasticsearch use GET /enriched_records/_search and we 
find that each record has an ingest_timestamp field and the value is the recent 
time.
{code:json}
{
"_index":"enriched_records",
"_type":"_doc",
"_id":"3",
"_score":1,
"_source":{
"operation_time":"1970-01-01 00:00:00Z",
"create_time":"2023-01-20 12:25:37",
"user_id":789,
"ingest_timestamp":"2023-01-28T05:21:40.539754251Z",
"id":3,
"proc_time":"2023-01-28 05:21:40.233Z"
}
} {code}
When we modify a record in MySQL, the value of the ingest_timestamp field does 
not change, and it seems that the pipeline set for this index is not working at 
this moment.
{code:json}
{
"_index":"enriched_records",
"_type":"_doc",
"_id":"3",
"_score":1,
"_source":{
"operation_time":"2023-01-28 05:25:05Z",
"create_time":"2023-01-20 12:25:37",
"user_id":987,
"ingest_timestamp":"2023-01-28T05:21:40.539754251Z",
"id":3,
"proc_time":"2023-01-28 05:25:05.529Z"
}
}
{code}
If we directly modify a field in Elasticsearch, we can find that the value of 
the ingest_timestamp field will change.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-290: Operator state compression (FLINK-30113)

2023-01-27 Thread Yuan Mei
+1 binding

Best
Yuan

On Sat, Jan 21, 2023 at 12:49 AM Rui Fan <1996fan...@gmail.com> wrote:

> +1 (no-binding)
>
> Best
> Rui Fan
>
> On Fri, Jan 20, 2023 at 10:46 PM ConradJam  wrote:
>
> > +1 (no-binding)
> > thanks driving it
> >
> > Martijn Visser  于2023年1月20日周五 21:16写道:
> >
> > > +1 (binding)
> > >
> > > Thanks for driving this
> > >
> > > Best regards, Martijn
> > >
> > > Op vr 20 jan. 2023 om 12:53 schreef Piotr Nowojski <
> pnowoj...@apache.org
> > >:
> > >
> > > > +1 binding
> > > >
> > > > pt., 20 sty 2023 o 11:33 Dawid Wysakowicz 
> > > > napisał(a):
> > > >
> > > > > +1 (binding)
> > > > >
> > > > > Best,
> > > > >
> > > > > Dawid
> > > > >
> > > > > On 20/01/2023 11:24, Etienne Chauchot wrote:
> > > > > > Hi all,
> > > > > >
> > > > > > after the discussion in [1], I would like to open a voting thread
> > for
> > > > > > FLIP-290 [2] which discusses operator state compression.
> > > > > >
> > > > > > The vote will be open until January 25th (72h + weekend), unless
> > > there
> > > > > > is an objection or not enough votes.
> > > > > >
> > > > > > Best
> > > > > >
> > > > > > Etienne
> > > > > >
> > > > > >
> > > > > > (1)
> > https://lists.apache.org/thread/mor8vtc8s1w53mq884l80y3nt6zybw1w
> > > > > >
> > > > > > (2)
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-290+Operator+state+compression
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> > --
> > Best
> >
> > ConradJam
> >
>


[Discuss] Conventions on driving FLIPs towards consensus

2023-01-27 Thread Xintong Song
Hi devs,

Recently, a discussion about how to drive FLIPs towards consensus has taken
place on the private@ mailing list. While many PMC members have already
shared their opinions, we believe for this topic the opinions of other
committers / contributors are equally important. Therefore, we are moving
the discussion to the dev@ mailing list for a wilder involvement.

### Background

Flink Improvement Proposal (FLIP) [1], the process for proposing,
discussing, reviewing and voting on major changes to Flink, plays an
important role in driving the project forward. According to the process,
*consensus*[2] is required for any proposal to be accepted. This means
objections from any single committer can block the process. It has been
observed many times that a FLIP is long blocked on a disapproving
committer. In most cases, this is necessary for addressing technical
concerns. However, there are also cases that after raising concerns the
committer becomes unresponsive (due to other works, personal vacation
plans, etc.), leaving the FLIP blocked for an unnecessarily long time.

The purpose of this discussion is to come up with some conventions on
preventing FLIPs from being long blocked on unresponsive reviewers.

### Summary of the previous discussion on private@

   - Most people agree that the progress of a FLIP should not be long
   blocked on an unresponsive reviewer. When a reviewer blocks the progress of
   a FLIP, he/she should be responsive to the subsequent replies, or at least
   provide a reasonable estimated time of response.
   - As for how long we should wait for the responses, there’s a tendency
   towards relying on the judgement of individuals while also having a
   typically recommended time (1 week).
   - Committers should use their veto rights with care. Per the ASF policy
   [3], vetos must be provided with a technical justification showing why the
   change is bad. They should not be used for simply blocking the process so
   the voter has more time to catch up.
   - We’d also encourage the FLIP proposers to actively reach out to the
   interested parties (e.g., previous contributors of the relevant part)
   early. It helps expose and address the potential concerns early and also
   leaves more time for other parties to respond while the proposer works on
   the FLIP.
   - We’d like to work on trust rather than heavy rules and processes. All
   above, if agreed, should be conventions among the community. We would not
   formally change the FLIP process.


Looking forward to your opinions.


Best,

Xintong


[1]
https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals

[2]
https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws#FlinkBylaws-Approvals

[3] https://www.apache.org/foundation/voting.html#Veto


[jira] [Created] (FLINK-30808) MultipleInputITCase failed with AdaptiveBatch Scheduler

2023-01-27 Thread Junrui Li (Jira)
Junrui Li created FLINK-30808:
-

 Summary: MultipleInputITCase failed with AdaptiveBatch Scheduler
 Key: FLINK-30808
 URL: https://issues.apache.org/jira/browse/FLINK-30808
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.16.0, 1.17.0
Reporter: Junrui Li
 Fix For: 1.17.0


MultipleInputITCase#testRelatedInputs failed with AdaptiveBatch Scheduler.
{code:java}
//代码占位符
java.lang.UnsupportedOperationException: Forward partitioning does not allow 
change of parallelism. Upstream operation: Calc[10]-14 parallelism: 1, 
downstream operation: HashJoin[15]-20 parallelism: 3 You must use another 
partitioning strategy, such as broadcast, rebalance, shuffle or global. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Reworking the Rescale API

2023-01-27 Thread ConradJam
Sorry I'm late to join discuss, I've gleaned a lot of useful information
from you guys

*@max*

   - when user repartition, we still need to restart the job, can we try to
   do this part of the work internally instead of externally, as
   *@konstantin* said only trigger rescaling when the checkpoint or
   retain-checkpoint is completed operations to minimize reprocessing

*@konstantin*

   - I think you mentioned that 2 FLIPs are being drafted which I consider
   to be the condition to achieve the *@max* goal, I would love to join
   this discussion and contribute it. I've tried a native implementation of
   this part myself, if I can help the community that's the best I can do

*@chesnay*

   - The docs section is confusion/misconceptions confusing like *@gyula *say,
   I'll see if I can fix it


*About Rescale Api*

  Some limitations and differences between *default* and *reactive mode* were
discussed earlier, and *@chesnay* explained some of their limitations and
behaviors, essentially they are two different things. I agree that when
reactive mode is ready, it should be used as the *reactive mode* for the
default *stream processing* job.
  As for the *[1] **Rescale API*, as we know now it seems to be unusable, I
believe the goal of this api is to be able to do fast reparallelism. I
would like to wait until the discussion is over and the 2 draft FILPs
mentioned earlier are completed. It is not too late to make another
decision on whether to modify the *[2] **Rescale Rest API *to support for
parallelism modification of job vertices


   1.
*https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/elastic_scaling/
   

   *
   2.
*https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid-rescaling
   

   *


Best~



Maximilian Michels  于2023年1月24日周二 01:08写道:

> Hi,
>
> The current rescale API appears to be a work in progress. A couple years
> ago, we disabled access to the API [1].
>
> I'm looking into this problem as part of working on autoscaling [2] where
> we currently require a full restart of the job to apply the parallelism
> overrides. This adds additional delay and comes with the caveat that we
> don't know whether sufficient resources are available prior to executing
> the scaling decision. We obviously do not want to get stuck due to a lack
> of resources. So a rescale API would have to ensure enough resources are
> available prior to restarting the job.
>
> I've created an issue here:
> https://issues.apache.org/jira/browse/FLINK-30773
>
> Any comments or interest in working on this?
>
> -Max
>
> [1] https://issues.apache.org/jira/browse/FLINK-12312
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling
>


-- 
Best

ConradJam


Re: [DISCUSS] FLIP-276: Data Consistency of Streaming and Batch ETL in Flink and Table Store

2023-01-27 Thread Vasiliki Papavasileiou
Hi Shammon,


Thank you for opening this FLIP which is very interesting and such an
important feature to add to the Flink ecosystem. I have a couple of
suggestions/questions:



   -

   Consistency is a very broad term with different meanings. There are many
   variations between the two extremes of weak and strong consistency that
   tradeoff latency for consistency. https://jepsen.io/consistency It would
   be great if we could devise an approach that allows the user to choose
   which consistency level they want to use for a query.


Example: In your figure where you have a DAG, assume a user queries only
Table1 for a specific key. Then, a failure happens and the table restores
from a checkpoint. The user issues the same query, looking up the same key.
What value does she see? With monotonic-reads, the system guarantees that
she will only see the same or newer values but not older, hence will not
experience time-travel. This is a very useful property for a system to have
albeit it is at the weaker-end of consistency guarantees. But it is a good
stepping stone.


Another example, assume the user queries Table1 for key K1 and gets the
value V11. Then, she queries Table2 that is derived from Table1 for the
same key, K1, that returns value V21. What is the relationship between V21
and V11? Is V21 derived from V11 or can it be an older value V1 (the
previous value of K1)? What if value V21 is not yet in table Table2? What
should she see when she queries Table1? Should she see the key V11 or not?
Should the requirement be that a record is not visible in any of the tables
in a DAG unless it is available in all of them?



   -

   It would we good to have a set of examples with consistency anomalies
   that can happen (like the examples above) and what consistency levels we
   want the system to offer to prevent them.
   Moreover, for each such example, it would be good to have a description
   of how the approach (Timestamp Barriers) will work in practice to prevent
   such anomalies.


Thank you,
Vicky


On Fri, Jan 27, 2023 at 4:46 PM John Roesler  wrote:

> Hello Shammon and all,
>
> Thanks for this FLIP! I've been working toward this kind of global
> consistency across large scale data infrastructure for a long time, and
> it's fantastic to see a high-profile effort like this come into play.
>
> I have been lurking in the discussion for a while and delaying my response
> while I collected my thoughts. However, I've realized at some point,
> delaying more is not as useful as just asking a few questions, so I'm sorry
> if some of this seems beside the point. I'll number these to not collide
> with prior discussion points:
>
> 10. Have you considered proposing a general consistency mechanism instead
> of restricting it to TableStore+ETL graphs? For example, it seems to me to
> be possible and valuable to define instead the contract that sources/sinks
> need to implement in order to participate in globally consistent snapshots.
>
> 11. It seems like this design is assuming that the "ETL Topology" under
> the envelope of the consistency model is a well-ordered set of jobs, but I
> suspect this is not the case for many organizations. It may be
> aspirational, but I think the gold-standard here would be to provide an
> entire organization with a consistency model spanning a loosely coupled
> ecosystem of jobs and data flows spanning teams and systems that are
> organizationally far apart.
>
> I realize that may be kind of abstract. Here's some examples of what's on
> my mind here:
>
> 11a. Engineering may operate one Flink cluster, and some other org, like
> Finance may operate another. In most cases, those are separate domains that
> don't typically get mixed together in jobs, but some people, like the CEO,
> would still benefit from being able to make a consistent query that spans
> arbitrary contexts within the business. How well can a feature like this
> transcend a single Flink infrastructure? Does it make sense to consider a
> model in which snapshots from different domains can be composable?
>
> 11b. Some groups may have a relatively stable set of long-running jobs,
> while others (like data science, skunkworks, etc) may adopt a more
> experimental, iterative approach with lots of jobs entering and exiting the
> ecosystem over time. It's still valuable to have them participate in the
> consistency model, but it seems like the consistency system will have to
> deal with more chaos than I see in the design. For example, how can this
> feature tolerate things like zombie jobs (which are registered in the
> system, but fail to check in for a long time, and then come back later).
>
> 12. I didn't see any statements about patterns like cycles in the ETL
> Topology. I'm aware that there are fundamental constraints on how well
> cyclic topologies can be supported by a distributed snapshot algorithm.
> However, there are a range of approaches/compromises that we can apply to
> cyclic topologies. At the very 

[jira] [Created] (FLINK-30807) State Processor API - Overwrite Support for Savepoints

2023-01-27 Thread Rion Williams (Jira)
Rion Williams created FLINK-30807:
-

 Summary: State Processor API - Overwrite Support for Savepoints
 Key: FLINK-30807
 URL: https://issues.apache.org/jira/browse/FLINK-30807
 Project: Flink
  Issue Type: New Feature
  Components: API / State Processor
Reporter: Rion Williams


Currently there is no overwrite support when using the State Processor API to 
create a savepoint at a given location. For applications that may run or 
generate a given savepoint on a periodic basis (e.g. cron job, nightly process, 
etc.) this can result in an exception if the job was previously run.

This ticket proposes amending the existing `SavePointWriter` class to support 
passing the preferred overwrite mode as an optional parameter when writing the 
savepoint similar to the example below:

```
SavepointWriter
    .newSavepoint(env, new HashMapStateBackend(), maxParallelism)
    .withOperator(OperatorIdentifier.forUid("uid1"), transformation1)
    .withOperator(OperatorIdentifier.forUid("uid2"), transformation2)
    .write(savepointPath, FileSystem.WriteMode.OVERWRITE);
```

This coincides with the underlying writer class which explicitly declares the 
use of  `FileSystem.WriteMode.NO_OVERWRITE` within the `FileCopyFunction` class 
as seen below:

```

public final class FileCopyFunction implements OutputFormat {
    ...

    @Override

    public void writeRecord(Path sourcePath) throws IOException {

        Path destPath = new Path(path, sourcePath.getName());

        try (FSDataOutputStream os =

                        destPath.getFileSystem()

                                .create(destPath, 
FileSystem.WriteMode.NO_OVERWRITE);

                FSDataInputStream is = 
sourcePath.getFileSystem().open(sourcePath)) {

            IOUtils.copyBytes(is, os);

        }

    }
    ...

}

```

An alternative solution might be to explicitly check for the existence of the 
file at the destination and deleting it, although the above seems much more 
elegant.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] Externalize AWS formats to flink-connector-aws

2023-01-27 Thread Jing Ge
+1 Thanks for driving it!

Best regards,
Jing

On Fri, Jan 27, 2023 at 2:36 PM Danny Cranmer 
wrote:

> Hello Hong, thanks for driving this.
>
> +1 on the proposal and the voting schema looks good.
>
> Thanks
>
> On Thu, Jan 26, 2023 at 5:15 PM Teoh, Hong 
> wrote:
>
> > Hi all,
> >
> > As discussed in the discussion thread [1], I would like to propose we
> > externalize flink-avro-glue-schema-registry and
> > flink-json-glue-schema-registry formats to the flink-connector-aws
> > repository [2].
> >
> > Motivation:
> > 1. We can unify and upgrade the AWS SDK versions more easily.
> > 2. We can now move flink-connector-aws-base to the external repository as
> > well.
> >
> > Voting Schema:
> > Consensus, committers have binding votes, open for at least 72 hours.
> >
> > Thanks,
> > Hong
> >
> >
> > [1] https://lists.apache.org/thread/03l99yz62mq3ngj8cvg8stk4foym65jq
> > [2] https://github.com/apache/flink-connector-aws/
> >
>


[jira] [Created] (FLINK-30806) Bump protobuf-java dependency version for aws connectors

2023-01-27 Thread Aleksandr Pilipenko (Jira)
Aleksandr Pilipenko created FLINK-30806:
---

 Summary: Bump protobuf-java dependency version for aws connectors
 Key: FLINK-30806
 URL: https://issues.apache.org/jira/browse/FLINK-30806
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / AWS
Reporter: Aleksandr Pilipenko
 Fix For: aws-connector-4.1.0, aws-connector-3.1.0


Update protobuf-java dependency version to at least 3.19.4 to match version 
used in kinesis-client-library.

Use 3.21.7 to match version used in Flink if there are no compatibility issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30805) SplitEnumerator#handleSplitRequest() should be called automatically

2023-01-27 Thread Etienne Chauchot (Jira)
Etienne Chauchot created FLINK-30805:


 Summary: SplitEnumerator#handleSplitRequest() should be called 
automatically
 Key: FLINK-30805
 URL: https://issues.apache.org/jira/browse/FLINK-30805
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Common
Reporter: Etienne Chauchot


SplitEnumerator#handleSplitRequest() is not called automatically by the new 
source framework which could be surprising to a source author. Right now a 
source author would have to call it himself when a split is finished or early 
when the reader gets created. 
IMHO it would be good if we could find a way for the framework to call it when 
a split is finished automatically



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-276: Data Consistency of Streaming and Batch ETL in Flink and Table Store

2023-01-27 Thread John Roesler
Hello Shammon and all,

Thanks for this FLIP! I've been working toward this kind of global consistency 
across large scale data infrastructure for a long time, and it's fantastic to 
see a high-profile effort like this come into play.

I have been lurking in the discussion for a while and delaying my response 
while I collected my thoughts. However, I've realized at some point, delaying 
more is not as useful as just asking a few questions, so I'm sorry if some of 
this seems beside the point. I'll number these to not collide with prior 
discussion points:

10. Have you considered proposing a general consistency mechanism instead of 
restricting it to TableStore+ETL graphs? For example, it seems to me to be 
possible and valuable to define instead the contract that sources/sinks need to 
implement in order to participate in globally consistent snapshots.

11. It seems like this design is assuming that the "ETL Topology" under the 
envelope of the consistency model is a well-ordered set of jobs, but I suspect 
this is not the case for many organizations. It may be aspirational, but I 
think the gold-standard here would be to provide an entire organization with a 
consistency model spanning a loosely coupled ecosystem of jobs and data flows 
spanning teams and systems that are organizationally far apart.

I realize that may be kind of abstract. Here's some examples of what's on my 
mind here: 

11a. Engineering may operate one Flink cluster, and some other org, like 
Finance may operate another. In most cases, those are separate domains that 
don't typically get mixed together in jobs, but some people, like the CEO, 
would still benefit from being able to make a consistent query that spans 
arbitrary contexts within the business. How well can a feature like this 
transcend a single Flink infrastructure? Does it make sense to consider a model 
in which snapshots from different domains can be composable?

11b. Some groups may have a relatively stable set of long-running jobs, while 
others (like data science, skunkworks, etc) may adopt a more experimental, 
iterative approach with lots of jobs entering and exiting the ecosystem over 
time. It's still valuable to have them participate in the consistency model, 
but it seems like the consistency system will have to deal with more chaos than 
I see in the design. For example, how can this feature tolerate things like 
zombie jobs (which are registered in the system, but fail to check in for a 
long time, and then come back later).

12. I didn't see any statements about patterns like cycles in the ETL Topology. 
I'm aware that there are fundamental constraints on how well cyclic topologies 
can be supported by a distributed snapshot algorithm. However, there are a 
range of approaches/compromises that we can apply to cyclic topologies. At the 
very least, we can state that we will detect cycles and produce a warning, etc.

13. I'm not sure how heavily you're waiting the query syntax part of the 
proposal, so please feel free to defer this point. It looked to me like the 
proposal assumes people want to query either the latest consistent snapshot or 
the latest inconsistent state. However, it seems like there's a significant 
opportunity to maintain a manifest of historical snapshots and allow people to 
query as of old points in time. That can be valuable for individuals answering 
data questions, building products, and crucially supporting auditability use 
cases. To that latter point, it seems nice to provide not only a mechanism to 
query arbitrary snapshots, but also to define a TTL/GC model that allows users 
to keep hourly snapshots for N hours, daily snapshots for N days, weekly 
snapshots for N weeks, and the same for monthly, quarterly, and yearly 
snapshots.

Ok, that's all I have for now :) I'd also like to understand some lower-level 
details, but I wanted to get these high-level questions off my chest.

Thanks again for the FLIP!
-John

On 2023/01/13 11:43:28 Shammon FY wrote:
> Hi Piotr,
> 
> I discussed with @jinsong lee about `Timestamp Barrier` and `Aligned
> Checkpoint` for data consistency in FLIP, we think there are many defects
> indeed in using `Aligned Checkpoint` to support data consistency as you
> mentioned.
> 
> According to our historical discussion, I think we have reached an
> agreement on an important point: we finally need `Timestamp Barrier
> Mechanism` to support data consistency. But according to our (@jinsong lee
> and I) opinions, the total design and implementation based on 'Timestamp
> Barrier' will be too complex, and it's also too big in one FLIP.
> 
> So we‘d like to use FLIP-276[1] as an overview design of data consistency
> in Flink Streaming and Batch ETL based on `Timestamp Barrier`. @jinsong and
> I hope that we can reach an agreement on the overall design in FLINK-276
> first, and then on the basic of FLIP-276 we can create other FLIPs with
> detailed design according to modules and drive them. Finally, we can
> 

Re: [VOTE] Release 1.16.1, release candidate #1

2023-01-27 Thread Gyula Fóra
+1 (binding)

* Verified signatures and hashes
* Built from source, ran some examples on Kubernetes
* Verified NOTICE and LICENSE files

Gyula

On Fri, Jan 27, 2023 at 3:51 PM Teoh, Hong 
wrote:

> +1 (non-binding)
>
> * Hashes and Signatures look good
> * All required files on dist.apache.org
> * Source archive builds using maven
> * Started packaged example WordCountSQLExample job.
>
> Cheers,
> Hong
>
>
>
> On 27/01/2023, 07:52, "Gabor Somogyi"  wrote:
>
> CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
>
>
>
> +1 (non-binding)
>
> * Verified versions in the poms
> * Built from source
> * Verified checksums and signatures
> * Started basic workloads with kubernetes operator
> * Verified NOTICE and LICENSE files
>
> G
>
>
> On Thu, Jan 26, 2023 at 2:28 PM Dawid Wysakowicz <
> dwysakow...@apache.org>
> wrote:
>
> > +1 (binding)
> >
> >- verified checksums & signatures
> >- checked differences of pom.xml and NOTICE files with 1.16.0
> release.
> >looks good
> >- checked source release contains no binaries
> >- built from sources
> >- run StateMachineExample on a local cluster
> >- checked the web PR
> >
> > Best,
> >
> > Dawid
> > On 19/01/2023 17:07, Martijn Visser wrote:
> >
> > Hi everyone,
> > Please review and vote on the release candidate #1 for the version
> 1.16.1,
> > as follows:
> > [ ] +1, Approve the release
> > [ ] -1, Do not approve the release (please provide specific comments)
> >
> >
> > The complete staging area is available for your review, which
> includes:
> > * JIRA release notes [1],
> > * the official Apache source release and binary convenience releases
> to be
> > deployed to dist.apache.org [2], which are signed with the key with
> > fingerprint A5F3BCE4CBE993573EC5966A65321B8382B219AF [3] (,
> > * all artifacts to be deployed to the Maven Central Repository [4],
> > * source code tag "release-1.16.1-rc1" [5],
> > * website pull request listing the new release and adding
> announcement blog
> > post [6].
> >
> > The vote will be open for at least 72 hours. It is adopted by
> majority
> > approval, with at least 3 PMC affirmative votes.
> >
> > Thanks,
> > Release Manager
> >
> > NOTE: The maven artifacts have been signed by Chesnay with the key
> with
> > fingerprint C2EED7B111D464BA
> >
> > [1]
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352344
> > [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.16.1-rc1/
> > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > [4]
> https://repository.apache.org/content/repositories/orgapacheflink-1580
> > [5] https://github.com/apache/flink/releases/tag/release-1.16.1-rc1
> > [6] https://github.com/apache/flink-web/pull/603
> >
> >
>
>


[jira] [Created] (FLINK-30804) flink-kubernetes-operator helm chart - allow specifying operator pod resources requests/limits

2023-01-27 Thread Vincent Chenal (Jira)
Vincent Chenal created FLINK-30804:
--

 Summary: flink-kubernetes-operator helm chart - allow specifying 
operator pod resources requests/limits
 Key: FLINK-30804
 URL: https://issues.apache.org/jira/browse/FLINK-30804
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.3.1
Reporter: Vincent Chenal


flink-kubernetes-operator helm chart doe snot allow specifying resources 
requests/limits for operator pod. It would be nice having such configuration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30803) PyFlink mishandles script dependencies

2023-01-27 Thread Nuno Afonso (Jira)
Nuno Afonso created FLINK-30803:
---

 Summary: PyFlink mishandles script dependencies
 Key: FLINK-30803
 URL: https://issues.apache.org/jira/browse/FLINK-30803
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.15.3, 1.15.2, 1.16.0
Reporter: Nuno Afonso
 Attachments: word_count_split.zip

h2. Summary

Since Flink 1.15, PyFlink is unable to run scripts that import other scripts 
under other directories. For instance, if _main.py_ imports 
{_}job/word_count.py{_}, PyFlink will fail due to not finding the _job_ 
directory.

The issue seems to have started after a [refactoring of 
_PythonDriver_|https://github.com/apache/flink/commit/330aae0c6e0811f50888d17830f10f7a29efe7d7]
 to address FLINK-26847. The path to the Python script is removed, which forces 
PyFlink to use the copy in its temporary directory. When files are copied to 
this directory, the original directory structure is not maintained and ends up 
breaking the imports.
h2. Testing

To confirm the regression, I ran the attached application in both a Flink 
1.14.6 and 1.15.3 clusters.
h3. Flink 1.14.6

Application was able to start after being submitted via CLI:

 
{code:java}
% ./bin/flink run --python ~/sandbox/word_count_split/main.py
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner 
(file:/.../flink-1.14.6/lib/flink-dist_2.12-1.14.6.jar) to field 
java.lang.String.value
WARNING: Please consider reporting this to the maintainers of 
org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal 
reflective access operations
WARNING: All illegal access operations will be denied in a future release
Job has been submitted with JobID 6f7be21072384ca3a314af10860c4ba8 {code}
 
h3. Flink 1.15.3

Application did not start due to not finding the _job_ directory:

 
{code:java}
% ./bin/flink run --python ~/sandbox/word_count_split/main.py
Traceback (most recent call last):
  File "/usr/lib64/python3.7/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/usr/lib64/python3.7/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File 
"/tmp/pyflink/40c649c3-24af-46ef-ae27-e0019cb55769/3673dd18-adff-40e0-bb11-06a3f00ba29c/main.py",
 line 5, in 
    from job.word_count import word_count
ModuleNotFoundError: No module named 'job'
org.apache.flink.client.program.ProgramAbortException: 
java.lang.RuntimeException: Python process exits with code: 1
        at 
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:841)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:240)
        at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1085)
        at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1163)
        at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1163)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
        at 
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
        ... 13 more {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] Release 1.16.1, release candidate #1

2023-01-27 Thread Teoh, Hong
+1 (non-binding)

* Hashes and Signatures look good
* All required files on dist.apache.org
* Source archive builds using maven
* Started packaged example WordCountSQLExample job.

Cheers,
Hong



On 27/01/2023, 07:52, "Gabor Somogyi"  wrote:

CAUTION: This email originated from outside of the organization. Do not 
click links or open attachments unless you can confirm the sender and know the 
content is safe.



+1 (non-binding)

* Verified versions in the poms
* Built from source
* Verified checksums and signatures
* Started basic workloads with kubernetes operator
* Verified NOTICE and LICENSE files

G


On Thu, Jan 26, 2023 at 2:28 PM Dawid Wysakowicz 
wrote:

> +1 (binding)
>
>- verified checksums & signatures
>- checked differences of pom.xml and NOTICE files with 1.16.0 release.
>looks good
>- checked source release contains no binaries
>- built from sources
>- run StateMachineExample on a local cluster
>- checked the web PR
>
> Best,
>
> Dawid
> On 19/01/2023 17:07, Martijn Visser wrote:
>
> Hi everyone,
> Please review and vote on the release candidate #1 for the version 1.16.1,
> as follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
>
> The complete staging area is available for your review, which includes:
> * JIRA release notes [1],
> * the official Apache source release and binary convenience releases to be
> deployed to dist.apache.org [2], which are signed with the key with
> fingerprint A5F3BCE4CBE993573EC5966A65321B8382B219AF [3] (,
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag "release-1.16.1-rc1" [5],
> * website pull request listing the new release and adding announcement 
blog
> post [6].
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> Thanks,
> Release Manager
>
> NOTE: The maven artifacts have been signed by Chesnay with the key with
> fingerprint C2EED7B111D464BA
>
> 
[1]https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352344
> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.16.1-rc1/
> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> [4] https://repository.apache.org/content/repositories/orgapacheflink-1580
> [5] https://github.com/apache/flink/releases/tag/release-1.16.1-rc1
> [6] https://github.com/apache/flink-web/pull/603
>
>



[jira] [Created] (FLINK-30802) Improve SplitReader#fetch() documentation

2023-01-27 Thread Etienne Chauchot (Jira)
Etienne Chauchot created FLINK-30802:


 Summary: Improve SplitReader#fetch() documentation
 Key: FLINK-30802
 URL: https://issues.apache.org/jira/browse/FLINK-30802
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Common, Documentation
Reporter: Etienne Chauchot


[SplitReader#fetch()|https://nightlies.apache.org/flink/flink-docs-master/api/java/]
 lacks details on the fact that source authors can decide to interrupt it (for 
ex for performance reasons) for it to be resumed later based on its state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] Release flink-connector-gcp-pubsub v3.0.0, release candidate #1

2023-01-27 Thread Danny Cranmer
Hey Martijn, thanks for driving yet another connector release.

Before I cast my vote I have a couple of queries:
* Contents of dist. Why are we publishing the e2e tests jar [1]? We do not
typically do this, but I see the old GCP e2e tests are published [2],
however not to maven central.
* NOTICE and LICENSE look good, but I have a query on the copyright
"Copyright 2014-2022 The Apache Software Foundation". When do we update to
2023?

Rest is ok:
* Release notes look good
* Verified signature and hashes or source archive and jar from dist
* Verified there are no binaries in the source archive
* Builds from src
* Reviewed web PR
* Tag present in Github

Thanks,
Danny

[1]
https://repository.apache.org/content/repositories/orgapacheflink-1581/org/apache/flink/flink-connector-gcp-pubsub-e2e-tests/
[2]
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-gcp-pubsub-emulator-tests

On Thu, Jan 26, 2023 at 7:06 PM Sergey Nuyanzin  wrote:

> +1 (non-binding)
>
> * verified hashes and signatures
> * verified versions in pom files
> * verified LICENSE and NOTICE files
> * compared sources against git tag
> * built from sources
>
>
> On Thu, Jan 26, 2023 at 3:08 PM Konstantin Knauf 
> wrote:
>
> > +1 (binding)
> >
> > * checked Maven and source artifact signatures and checksums - OK
> > * no binaries or packaged dependencies - OK
> > * checked website changes - Approved.
> >
> > Am Fr., 20. Jan. 2023 um 15:39 Uhr schrieb Martijn Visser <
> > martijnvis...@apache.org>:
> >
> > > Hi everyone,
> > > Please review and vote on the release candidate #1 for the version
> 3.0.0,
> > > as follows:
> > > [ ] +1, Approve the release
> > > [ ] -1, Do not approve the release (please provide specific comments)
> > >
> > >
> > > The complete staging area is available for your review, which includes:
> > > * JIRA release notes [1],
> > > * the official Apache source release to be deployed to dist.apache.org
> > > [2],
> > > which are signed with the key with fingerprint
> > > A5F3BCE4CBE993573EC5966A65321B8382B219AF [3],
> > > * all artifacts to be deployed to the Maven Central Repository [4],
> > > * source code tag v3.0.0-rc1 [5],
> > > * website pull request listing the new release [6].
> > >
> > > The vote will be open for at least 72 hours. It is adopted by majority
> > > approval, with at least 3 PMC affirmative votes.
> > >
> > > Thanks,
> > > Release Manager
> > >
> > > [1]
> > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352589
> > > [2]
> > >
> > >
> >
> https://dist.apache.org/repos/dist/dev/flink/flink-connector-gcp-pubsub-3.0.0-rc1
> > > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > > [4]
> > >
> https://repository.apache.org/content/repositories/orgapacheflink-1581/
> > > [5]
> > >
> > >
> >
> https://github.com/apache/flink-connector-gcp-pubsub/releases/tag/v3.0.0-rc1
> > > [6] https://github.com/apache/flink-web/pull/604
> > >
> >
> >
> > --
> > https://twitter.com/snntrable
> > https://github.com/knaufk
> >
>
>
> --
> Best regards,
> Sergey
>


Re: [VOTE] Externalize AWS formats to flink-connector-aws

2023-01-27 Thread Danny Cranmer
Hello Hong, thanks for driving this.

+1 on the proposal and the voting schema looks good.

Thanks

On Thu, Jan 26, 2023 at 5:15 PM Teoh, Hong 
wrote:

> Hi all,
>
> As discussed in the discussion thread [1], I would like to propose we
> externalize flink-avro-glue-schema-registry and
> flink-json-glue-schema-registry formats to the flink-connector-aws
> repository [2].
>
> Motivation:
> 1. We can unify and upgrade the AWS SDK versions more easily.
> 2. We can now move flink-connector-aws-base to the external repository as
> well.
>
> Voting Schema:
> Consensus, committers have binding votes, open for at least 72 hours.
>
> Thanks,
> Hong
>
>
> [1] https://lists.apache.org/thread/03l99yz62mq3ngj8cvg8stk4foym65jq
> [2] https://github.com/apache/flink-connector-aws/
>


Re: Confluent Avro and Debezium formats - default schema name can be incompatible with registered schema name

2023-01-27 Thread Dawid Wysakowicz

Hi Fruzsina,

I think this is a valid issue we should try to solve. A different 
approach I am thinking about is that we could actually add an option to 
provide an entire avro schema to use. Something like: 
`avro-confluent.schema` which we would validate it maps properly to the 
schema of the table (that is names of fields and their types match) and 
use it instead of the generated one.


What do you think about that approach?

Best,

Dawid

On 26/01/2023 11:29, Fruzsina Nagy wrote:

Hi everyone,

I have come across the below issue, while experimenting with the Confluent 
registry and avro-confluent, debezium-avro-confluent formats. Please let me 
know your thoughts on it. Should this issue be addressed?

Thanks in advance,
Fruzsina
The use case

Create a new topic on Confluent Cloud
Create a value schema with the name “sampleRecord”:
{
   "type": "record",
   "namespace": "com.mycorp.mynamespace",
   "name": "sampleRecord",
…}
Create table with “avro-confluent” format:
CREATE TABLE `newtesttopic` (
  `my_field1` INT NOT NULL,
  `my_field2` DOUBLE NOT NULL,
  `my_field3` VARCHAR(2147483647) NOT NULL,
  ") WITH (
  'connector' = 'kafka',
  'topic' = 'newtesttopic',
  'scan.startup.mode' = 'latest-offset',
  'properties.bootstrap.servers' = 'bootstrapServers',
  'properties.sasl.jaas.config' = 'saslJaasConfig',
  'properties.sasl.mechanism' = 'PLAIN',
  'properties.security.protocol' = 'SASL_SSL',
  'format' = 'avro-confluent',
  'avro-confluent.url' = 'confluentSchemaRegUrl',
  'avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
  'avro-confluent.basic-auth.user-info' = 'user:pw')

Insert data into the “newtesttopic”
The following error is thrown:
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema for subject 
"newtesttopic-value", details: [Incompatibility{type:NAME_MISMATCH, location:/name, message:expected: com.mycorp.mynamespace.sampleRecord, 
reader:{"type":"record","name":"record",...}, 
writer:{"type":"record","name":"sampleRecord",...}
This error of course can be avoided if we don’t register a schema for our topic 
on the Confluent Cloud site before inserting data into the kafka table, and we 
just let Flink register it for us with the name “record”.

The cause of the error

I found that the error is caused by the EncodingFormat> 
created by RegistryAvroFormatFactory.createEncodingFormat, because when creating a 
AvroRowDataSerializationSchema, it uses AvroSchemaConverter.convertToSchema(LogicalType schema) 

which names the schema “record” 

 by default.

But the registered schema is named “sampleRecord” in the above example, so the 
Confluent Schema Registry client doesn’t accept it.
The problem

To resolve this I added a new option “schema-name” to “avro-confluent” and 
“debezium-avro-confluent” formats. And as I was testing the 
“debezium-avro-confluent” format, it turned out that this solution doesn’t 
solve the problem in those cases when there are named schemas (record, enum, 
fixed types) nested in the schema of the topic.

For example:
In case of “debezium-avro-confluent” the schema created is a union of null and a 
Debezium specific record schema (before, after, op). If I use the above option to 
provide a specific name for the schema, I get an 
org.apache.avro.UnresolvedUnionException, because AvroRowDataSerializationSchema 

 converts the RowType to a record schema with the name “record”, which will not be 
found in the union, if the the Debezium specific record has a different name.
Union type is problematic because in the general case, if we define a union 
schema [schema1, schema2]meaning that the schema is either schema1 or schema2, 
we must determine somehow which schema we are converting the RowType to.

In case of nested named schemas, Flink creates a name based on the record name and 
the field name 
.
 Schema registry client will also throw an error in this case, if the registered 
names don’t match.

Possible solutions

Look up names of the schemas in the field comment, e.g. if there is a field of type 
ROW with a comment “avro-name = 

[jira] [Created] (FLINK-30801) Migrate LeaderElection-related unit tests to JUnit5

2023-01-27 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-30801:
-

 Summary: Migrate LeaderElection-related unit tests to JUnit5
 Key: FLINK-30801
 URL: https://issues.apache.org/jira/browse/FLINK-30801
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.17.0
Reporter: Matthias Pohl


To prepare the merge of the {{MultipleComponentLeaderElectionService}}-related 
tests with the legacy test, we want to align the JUnit versin they are using.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Reworking the Rescale API

2023-01-27 Thread Chesnay Schepler
It is certainly true that the messaging around the AS/reactive mode 
wasn't good.


In part this happened because initially we only intended to advertise 
reactive mode (at the time), and only later figured that the AS on it's 
own could already be useful too.


That being said, I'm not sure how to improve the elastic scaling page, 
can you make specific suggestions?
For example I don't see it being implied that using the adaptive 
schedulers uses reactive mode by default (which it doesn't).
Some tight coupling is of course required because reactive mode requires 
the AS, but to me the page makes it clear that you can use the AS on 
it's own.


The only thing I can think of is making reactive mode a sub-section of 
the AS.
I may have just been too involved to really see the problems in the 
docs; I'd appreciate any help you can give to improve the docs.


On 27/01/2023 10:33, Gyula Fóra wrote:

Also @David Morávek  @Chesnay Schepler 


It would be great if you could update the respective docs page before
publishing your improvement FLIPS about the adaptive scheduler:
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/elastic_scaling/

I think many of the confusion/misconceptions stem from this docs page as it
ties the Adaptive scheduler and Reactive mode closely together.
It also clearly implies that if you use the adaptive scheduler, then
reactive mode is the default behaviour which is probably not what we want
if we want users to switch over from the current standard scheduler.

We should clearly separate the features of the Adaptive Scheduler from the
reactive mode which is tailored for very specific use cases and setups
(standalone only) and requires careful configuration before production use.
Reactive mode should be an opt-in feature.

Gyula

On Fri, Jan 27, 2023 at 10:19 AM David Morávek  wrote:


The adaptive scheduler only supports streaming jobs. That's the biggest
limitation that probably won't be fixed anytime soon.


Since FLIP-283 [1] has been accepted, I think this limitation might have
already been addressed to a certain extent. I'd be completely fine with
having a separate scheduler for batch and streaming (maybe we could build a
hybrid one at some point that automatically switches between the two).

[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-283%3A+Use+adaptive+batch+scheduler+as+default+scheduler+for+batch+jobs


On Fri, Jan 27, 2023 at 9:58 AM Chesnay Schepler 
wrote:


The adaptive scheduler only supports streaming jobs. That's the biggest
limitation that probably won't be fixed anytime soon.
The goal was though to make the adaptive scheduler the default for
streaming jobs eventually.
it was very much meant as a better version of the default scheduler for
streaming jobs.

On 26/01/2023 19:06, David Morávek wrote:

Hi Gyula,



can you please explain why the AdaptiveScheduler is not the default
scheduler?

There are still some smaller bits missing. As far as I know, the

missing

parts are:

1) Local recovery (reusing the already downloaded state files after

restart

/ rescale)
2) Support for fine-grained resource management
3) Support for the session cluster (Chesnay will be submitting a FLIP

for

this soon)

We're looking into addressing all of these limitations in the short

term.

Personally, I'd love to start a discussion about making transitioning

the

AdaptiveScheduler into a default one after those limitations are fixed.
Being able to eventually deprecate and remove the DefaultScheduler

would

simplify the code-base by a lot since there are many adapters between

new

and old interfaces (eg. SlotPool-related interfaces).

Best,
D.

On Thu, Jan 26, 2023 at 6:27 PM Gyula Fóra 

wrote:

Chesnay,

Seems like you are suggesting that the Adaptive scheduler does

everything

the standard scheduler does and more.

I am clearly not an expert on this topic but can you please explain

why

the

AdaptiveScheduler is not the default scheduler?
If it can do everything, why do we even have 2 schedulers? Why not

simply

drop the "old" one?

That would probably clear up all confusionsthen :)

Gyula

On Thu, Jan 26, 2023 at 6:23 PM Chesnay Schepler 
wrote:


There's the default and reactive mode; nothing else.
At it's core they are the same thing; reactive mode just cranks up

the

desired parallelism to infinity and enforces certain assumptions

(e.g.,

no active resource management).

The advantage is that the adaptive scheduler can run jobs while not
sufficient resources are available, and scale things up again once

they

are available.
This is it's core functionality, but we always intended to extend it
such that users can modify the parallelism at runtime as well.
And since the AS can already rescale jobs (and was purpose-built with
that functionality in mind), this is just a matter of exposing an API
for it. Everything else is already there.

As a concrete use-case, let's say you have an SLA that says jobs must
not be down longer than X 

Re: Reworking the Rescale API

2023-01-27 Thread Gyula Fóra
Also @David Morávek  @Chesnay Schepler 


It would be great if you could update the respective docs page before
publishing your improvement FLIPS about the adaptive scheduler:
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/elastic_scaling/

I think many of the confusion/misconceptions stem from this docs page as it
ties the Adaptive scheduler and Reactive mode closely together.
It also clearly implies that if you use the adaptive scheduler, then
reactive mode is the default behaviour which is probably not what we want
if we want users to switch over from the current standard scheduler.

We should clearly separate the features of the Adaptive Scheduler from the
reactive mode which is tailored for very specific use cases and setups
(standalone only) and requires careful configuration before production use.
Reactive mode should be an opt-in feature.

Gyula

On Fri, Jan 27, 2023 at 10:19 AM David Morávek  wrote:

> >
> > The adaptive scheduler only supports streaming jobs. That's the biggest
> > limitation that probably won't be fixed anytime soon.
>
>
> Since FLIP-283 [1] has been accepted, I think this limitation might have
> already been addressed to a certain extent. I'd be completely fine with
> having a separate scheduler for batch and streaming (maybe we could build a
> hybrid one at some point that automatically switches between the two).
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-283%3A+Use+adaptive+batch+scheduler+as+default+scheduler+for+batch+jobs
>
>
> On Fri, Jan 27, 2023 at 9:58 AM Chesnay Schepler 
> wrote:
>
> > The adaptive scheduler only supports streaming jobs. That's the biggest
> > limitation that probably won't be fixed anytime soon.
> > The goal was though to make the adaptive scheduler the default for
> > streaming jobs eventually.
> > it was very much meant as a better version of the default scheduler for
> > streaming jobs.
> >
> > On 26/01/2023 19:06, David Morávek wrote:
> > > Hi Gyula,
> > >
> > >
> > >> can you please explain why the AdaptiveScheduler is not the default
> > >> scheduler?
> > >
> > > There are still some smaller bits missing. As far as I know, the
> missing
> > > parts are:
> > >
> > > 1) Local recovery (reusing the already downloaded state files after
> > restart
> > > / rescale)
> > > 2) Support for fine-grained resource management
> > > 3) Support for the session cluster (Chesnay will be submitting a FLIP
> for
> > > this soon)
> > >
> > > We're looking into addressing all of these limitations in the short
> term.
> > >
> > > Personally, I'd love to start a discussion about making transitioning
> the
> > > AdaptiveScheduler into a default one after those limitations are fixed.
> > > Being able to eventually deprecate and remove the DefaultScheduler
> would
> > > simplify the code-base by a lot since there are many adapters between
> new
> > > and old interfaces (eg. SlotPool-related interfaces).
> > >
> > > Best,
> > > D.
> > >
> > > On Thu, Jan 26, 2023 at 6:27 PM Gyula Fóra 
> wrote:
> > >
> > >> Chesnay,
> > >>
> > >> Seems like you are suggesting that the Adaptive scheduler does
> > everything
> > >> the standard scheduler does and more.
> > >>
> > >> I am clearly not an expert on this topic but can you please explain
> why
> > the
> > >> AdaptiveScheduler is not the default scheduler?
> > >> If it can do everything, why do we even have 2 schedulers? Why not
> > simply
> > >> drop the "old" one?
> > >>
> > >> That would probably clear up all confusionsthen :)
> > >>
> > >> Gyula
> > >>
> > >> On Thu, Jan 26, 2023 at 6:23 PM Chesnay Schepler 
> > >> wrote:
> > >>
> > >>> There's the default and reactive mode; nothing else.
> > >>> At it's core they are the same thing; reactive mode just cranks up
> the
> > >>> desired parallelism to infinity and enforces certain assumptions
> (e.g.,
> > >>> no active resource management).
> > >>>
> > >>> The advantage is that the adaptive scheduler can run jobs while not
> > >>> sufficient resources are available, and scale things up again once
> they
> > >>> are available.
> > >>> This is it's core functionality, but we always intended to extend it
> > >>> such that users can modify the parallelism at runtime as well.
> > >>> And since the AS can already rescale jobs (and was purpose-built with
> > >>> that functionality in mind), this is just a matter of exposing an API
> > >>> for it. Everything else is already there.
> > >>>
> > >>> As a concrete use-case, let's say you have an SLA that says jobs must
> > >>> not be down longer than X seconds, and a TM just crashed.
> > >>> If you can absolutely guarantee that your k8s cluster can provision a
> > >>> new TM within X seconds, no matter what cruel reality has in store
> for
> > >>> you, than you /may/ not need it.
> > >>> If you can't, well then here's a use-case for you.
> > >>>
> > >>>   > Last time I looked they implemented the same interface and the
> same
> > >>> base class. Of course, their behavior is quite 

Re: Reworking the Rescale API

2023-01-27 Thread Gyula Fóra
Thank you @Chesnay Schepler  @David Morávek


I think in that case our primary goal should be to make sure that streaming
jobs always use the adaptive scheduler.
Also then it makes perfect sense to build the rescale api improvements for
that specifically.

However we should have a clear plan to make this a default and communicate
to the users. Otherwise this will only cause further confusion to the users
if we add more and more adaptive scheduler specific features.

Cheers,
Gyula

On Fri, Jan 27, 2023 at 9:59 AM Chesnay Schepler  wrote:

> The adaptive scheduler only supports streaming jobs. That's the biggest
> limitation that probably won't be fixed anytime soon.
> The goal was though to make the adaptive scheduler the default for
> streaming jobs eventually.
> it was very much meant as a better version of the default scheduler for
> streaming jobs.
>
> On 26/01/2023 19:06, David Morávek wrote:
> > Hi Gyula,
> >
> >
> >> can you please explain why the AdaptiveScheduler is not the default
> >> scheduler?
> >
> > There are still some smaller bits missing. As far as I know, the missing
> > parts are:
> >
> > 1) Local recovery (reusing the already downloaded state files after
> restart
> > / rescale)
> > 2) Support for fine-grained resource management
> > 3) Support for the session cluster (Chesnay will be submitting a FLIP for
> > this soon)
> >
> > We're looking into addressing all of these limitations in the short term.
> >
> > Personally, I'd love to start a discussion about making transitioning the
> > AdaptiveScheduler into a default one after those limitations are fixed.
> > Being able to eventually deprecate and remove the DefaultScheduler would
> > simplify the code-base by a lot since there are many adapters between new
> > and old interfaces (eg. SlotPool-related interfaces).
> >
> > Best,
> > D.
> >
> > On Thu, Jan 26, 2023 at 6:27 PM Gyula Fóra  wrote:
> >
> >> Chesnay,
> >>
> >> Seems like you are suggesting that the Adaptive scheduler does
> everything
> >> the standard scheduler does and more.
> >>
> >> I am clearly not an expert on this topic but can you please explain why
> the
> >> AdaptiveScheduler is not the default scheduler?
> >> If it can do everything, why do we even have 2 schedulers? Why not
> simply
> >> drop the "old" one?
> >>
> >> That would probably clear up all confusionsthen :)
> >>
> >> Gyula
> >>
> >> On Thu, Jan 26, 2023 at 6:23 PM Chesnay Schepler 
> >> wrote:
> >>
> >>> There's the default and reactive mode; nothing else.
> >>> At it's core they are the same thing; reactive mode just cranks up the
> >>> desired parallelism to infinity and enforces certain assumptions (e.g.,
> >>> no active resource management).
> >>>
> >>> The advantage is that the adaptive scheduler can run jobs while not
> >>> sufficient resources are available, and scale things up again once they
> >>> are available.
> >>> This is it's core functionality, but we always intended to extend it
> >>> such that users can modify the parallelism at runtime as well.
> >>> And since the AS can already rescale jobs (and was purpose-built with
> >>> that functionality in mind), this is just a matter of exposing an API
> >>> for it. Everything else is already there.
> >>>
> >>> As a concrete use-case, let's say you have an SLA that says jobs must
> >>> not be down longer than X seconds, and a TM just crashed.
> >>> If you can absolutely guarantee that your k8s cluster can provision a
> >>> new TM within X seconds, no matter what cruel reality has in store for
> >>> you, than you /may/ not need it.
> >>> If you can't, well then here's a use-case for you.
> >>>
> >>>   > Last time I looked they implemented the same interface and the same
> >>> base class. Of course, their behavior is quite different.
> >>>
> >>> They never shared a base class since day 1. Are you maybe mixing up the
> >>> AdaptiveScheduler and AdaptiveBatchScheduler?
> >>>
> >>> As for FLINK-30773, I think that should be covered.
> >>>
> >>> On 26/01/2023 17:10, Maximilian Michels wrote:
>  Thanks for the explanation. If not for the "reactive mode", what is
>  the advantage of the adaptive scheduler? What other modes does it
>  support?
> 
> > Apart from implementing the same interface the implementations of the
> >>> adaptive and default schedulers are separate.
>  Last time I looked they implemented the same interface and the same
>  base class. Of course, their behavior is quite different.
> 
>  I'm still very interested in learning about the future FLIPs
>  mentioned. Based on the replies, I'm assuming that they will support
>  the changes required for
>  https://issues.apache.org/jira/browse/FLINK-30773, or at least
> provide
>  the basis for implementing them.
> 
>  -Max
> 
>  On Thu, Jan 26, 2023 at 4:57 PM Chesnay Schepler
> >>> wrote:
> > On 26/01/2023 16:18, Maximilian Michels wrote:
> >
> > I see slightly different goals for the standard and the adaptive
> 

Re: Reworking the Rescale API

2023-01-27 Thread David Morávek
>
> The adaptive scheduler only supports streaming jobs. That's the biggest
> limitation that probably won't be fixed anytime soon.


Since FLIP-283 [1] has been accepted, I think this limitation might have
already been addressed to a certain extent. I'd be completely fine with
having a separate scheduler for batch and streaming (maybe we could build a
hybrid one at some point that automatically switches between the two).

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-283%3A+Use+adaptive+batch+scheduler+as+default+scheduler+for+batch+jobs


On Fri, Jan 27, 2023 at 9:58 AM Chesnay Schepler  wrote:

> The adaptive scheduler only supports streaming jobs. That's the biggest
> limitation that probably won't be fixed anytime soon.
> The goal was though to make the adaptive scheduler the default for
> streaming jobs eventually.
> it was very much meant as a better version of the default scheduler for
> streaming jobs.
>
> On 26/01/2023 19:06, David Morávek wrote:
> > Hi Gyula,
> >
> >
> >> can you please explain why the AdaptiveScheduler is not the default
> >> scheduler?
> >
> > There are still some smaller bits missing. As far as I know, the missing
> > parts are:
> >
> > 1) Local recovery (reusing the already downloaded state files after
> restart
> > / rescale)
> > 2) Support for fine-grained resource management
> > 3) Support for the session cluster (Chesnay will be submitting a FLIP for
> > this soon)
> >
> > We're looking into addressing all of these limitations in the short term.
> >
> > Personally, I'd love to start a discussion about making transitioning the
> > AdaptiveScheduler into a default one after those limitations are fixed.
> > Being able to eventually deprecate and remove the DefaultScheduler would
> > simplify the code-base by a lot since there are many adapters between new
> > and old interfaces (eg. SlotPool-related interfaces).
> >
> > Best,
> > D.
> >
> > On Thu, Jan 26, 2023 at 6:27 PM Gyula Fóra  wrote:
> >
> >> Chesnay,
> >>
> >> Seems like you are suggesting that the Adaptive scheduler does
> everything
> >> the standard scheduler does and more.
> >>
> >> I am clearly not an expert on this topic but can you please explain why
> the
> >> AdaptiveScheduler is not the default scheduler?
> >> If it can do everything, why do we even have 2 schedulers? Why not
> simply
> >> drop the "old" one?
> >>
> >> That would probably clear up all confusionsthen :)
> >>
> >> Gyula
> >>
> >> On Thu, Jan 26, 2023 at 6:23 PM Chesnay Schepler 
> >> wrote:
> >>
> >>> There's the default and reactive mode; nothing else.
> >>> At it's core they are the same thing; reactive mode just cranks up the
> >>> desired parallelism to infinity and enforces certain assumptions (e.g.,
> >>> no active resource management).
> >>>
> >>> The advantage is that the adaptive scheduler can run jobs while not
> >>> sufficient resources are available, and scale things up again once they
> >>> are available.
> >>> This is it's core functionality, but we always intended to extend it
> >>> such that users can modify the parallelism at runtime as well.
> >>> And since the AS can already rescale jobs (and was purpose-built with
> >>> that functionality in mind), this is just a matter of exposing an API
> >>> for it. Everything else is already there.
> >>>
> >>> As a concrete use-case, let's say you have an SLA that says jobs must
> >>> not be down longer than X seconds, and a TM just crashed.
> >>> If you can absolutely guarantee that your k8s cluster can provision a
> >>> new TM within X seconds, no matter what cruel reality has in store for
> >>> you, than you /may/ not need it.
> >>> If you can't, well then here's a use-case for you.
> >>>
> >>>   > Last time I looked they implemented the same interface and the same
> >>> base class. Of course, their behavior is quite different.
> >>>
> >>> They never shared a base class since day 1. Are you maybe mixing up the
> >>> AdaptiveScheduler and AdaptiveBatchScheduler?
> >>>
> >>> As for FLINK-30773, I think that should be covered.
> >>>
> >>> On 26/01/2023 17:10, Maximilian Michels wrote:
>  Thanks for the explanation. If not for the "reactive mode", what is
>  the advantage of the adaptive scheduler? What other modes does it
>  support?
> 
> > Apart from implementing the same interface the implementations of the
> >>> adaptive and default schedulers are separate.
>  Last time I looked they implemented the same interface and the same
>  base class. Of course, their behavior is quite different.
> 
>  I'm still very interested in learning about the future FLIPs
>  mentioned. Based on the replies, I'm assuming that they will support
>  the changes required for
>  https://issues.apache.org/jira/browse/FLINK-30773, or at least
> provide
>  the basis for implementing them.
> 
>  -Max
> 
>  On Thu, Jan 26, 2023 at 4:57 PM Chesnay Schepler
> >>> wrote:
> > On 26/01/2023 16:18, Maximilian Michels wrote:
> >
> 

Re: Reworking the Rescale API

2023-01-27 Thread Chesnay Schepler
The adaptive scheduler only supports streaming jobs. That's the biggest 
limitation that probably won't be fixed anytime soon.
The goal was though to make the adaptive scheduler the default for 
streaming jobs eventually.
it was very much meant as a better version of the default scheduler for 
streaming jobs.


On 26/01/2023 19:06, David Morávek wrote:

Hi Gyula,



can you please explain why the AdaptiveScheduler is not the default
scheduler?


There are still some smaller bits missing. As far as I know, the missing
parts are:

1) Local recovery (reusing the already downloaded state files after restart
/ rescale)
2) Support for fine-grained resource management
3) Support for the session cluster (Chesnay will be submitting a FLIP for
this soon)

We're looking into addressing all of these limitations in the short term.

Personally, I'd love to start a discussion about making transitioning the
AdaptiveScheduler into a default one after those limitations are fixed.
Being able to eventually deprecate and remove the DefaultScheduler would
simplify the code-base by a lot since there are many adapters between new
and old interfaces (eg. SlotPool-related interfaces).

Best,
D.

On Thu, Jan 26, 2023 at 6:27 PM Gyula Fóra  wrote:


Chesnay,

Seems like you are suggesting that the Adaptive scheduler does everything
the standard scheduler does and more.

I am clearly not an expert on this topic but can you please explain why the
AdaptiveScheduler is not the default scheduler?
If it can do everything, why do we even have 2 schedulers? Why not simply
drop the "old" one?

That would probably clear up all confusionsthen :)

Gyula

On Thu, Jan 26, 2023 at 6:23 PM Chesnay Schepler 
wrote:


There's the default and reactive mode; nothing else.
At it's core they are the same thing; reactive mode just cranks up the
desired parallelism to infinity and enforces certain assumptions (e.g.,
no active resource management).

The advantage is that the adaptive scheduler can run jobs while not
sufficient resources are available, and scale things up again once they
are available.
This is it's core functionality, but we always intended to extend it
such that users can modify the parallelism at runtime as well.
And since the AS can already rescale jobs (and was purpose-built with
that functionality in mind), this is just a matter of exposing an API
for it. Everything else is already there.

As a concrete use-case, let's say you have an SLA that says jobs must
not be down longer than X seconds, and a TM just crashed.
If you can absolutely guarantee that your k8s cluster can provision a
new TM within X seconds, no matter what cruel reality has in store for
you, than you /may/ not need it.
If you can't, well then here's a use-case for you.

  > Last time I looked they implemented the same interface and the same
base class. Of course, their behavior is quite different.

They never shared a base class since day 1. Are you maybe mixing up the
AdaptiveScheduler and AdaptiveBatchScheduler?

As for FLINK-30773, I think that should be covered.

On 26/01/2023 17:10, Maximilian Michels wrote:

Thanks for the explanation. If not for the "reactive mode", what is
the advantage of the adaptive scheduler? What other modes does it
support?


Apart from implementing the same interface the implementations of the

adaptive and default schedulers are separate.

Last time I looked they implemented the same interface and the same
base class. Of course, their behavior is quite different.

I'm still very interested in learning about the future FLIPs
mentioned. Based on the replies, I'm assuming that they will support
the changes required for
https://issues.apache.org/jira/browse/FLINK-30773, or at least provide
the basis for implementing them.

-Max

On Thu, Jan 26, 2023 at 4:57 PM Chesnay Schepler

wrote:

On 26/01/2023 16:18, Maximilian Michels wrote:

I see slightly different goals for the standard and the adaptive
scheduler. The adaptive scheduler's goal is to adapt the Flink job
according to the available resources.

This is really a misconception that we just have to stomp out.

This statement only applies to reactive mode, a special mode in which

the adaptive scheduler (AS) can run in where active resource management

is

not supported since requesting infinite resources from k8s doesn't really
make sense.

The AS itself can work perfectly fine with active resource management,

and has no effect on how the RM talks to k8s. It can just keep the job
running in cases where less than desired (==user-provided parallelism)
resources are provided by k8s (possibly temporarily).

On 26/01/2023 16:18, Maximilian Michels wrote:

After
all, both schedulers share the same super class

Apart from implementing the same interface the implementations of the

adaptive and default schedulers are separate.






[jira] [Created] (FLINK-30800) Add doc entry how to re-generate java doc

2023-01-27 Thread Gabor Somogyi (Jira)
Gabor Somogyi created FLINK-30800:
-

 Summary: Add doc entry how to re-generate java doc
 Key: FLINK-30800
 URL: https://issues.apache.org/jira/browse/FLINK-30800
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Reporter: Gabor Somogyi


At the moment only the CI notifies ppls.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)