Re: [DISCUSS] Promote SinkV2 to @Public and deprecate SinkFunction

2023-02-03 Thread Konstantin Knauf
Hi everyone,

sorry for joining the discussion late.

1) Is there an option to deprecate SinkFunction in Flink 1.17 while leaving
SinkV2 @PublicEvolving in Flink 1.17. We then aim to make SinkV2 @Public in
and remove SinkFunction in Flink 1.18. @PublicEvolving are intended for
public use. So, I don't see it as a blocker for deprecating SinkFunction
that we have to make SinkV2 @PublicEvovling. For reference this is the
description of @PublicEvovling:

/**
 * Annotation to mark classes and methods for public use, but with
evolving interfaces.
 *
 * Classes and methods with this annotation are intended for public
use and have stable behavior.
 * However, their interfaces and signatures are not considered to be
stable and might be changed
 * across versions.
 *
 * This annotation also excludes methods and classes with evolving
interfaces / signatures within
 * classes annotated with {@link Public}.
 */


Marking SinkFunction @Deprecated would already single everyone to move to
SinkV2, which we as a community, I believe, have a strong interest in. Its
almost comical how long the transition from SourceFurnction/SinkFunction to
Source/Sink takes us. At the same time, we leave ourselves the option to to
make small changes to SinkV2 if any problems arise during the migration of
these connector.

I think, we have a bit of a chicken/egg problem here. The pressure for
users and contributors is not high enough to move away from SinkFunction as
long as its not deprecated, but at the same time we need people to migrate
their connectors to see if there are any gaps in SinkV2. I believe, the
combination proposed above could bridge this problem.

2) I don't understand the argument of waiting until some of the
implementations are @Public. How can we make the implementations of the
SinkV2 API @Public without making SinkV2 @Public? All public methods of
SinkV2 are part of every implementation. So to me it actually seems to be
opposite: in order to make any of the implementation @Public we first need
to make the API @Public.

Cheers,

Konstantin

Am Mo., 30. Jan. 2023 um 13:18 Uhr schrieb Dong Lin :

> Hi Martijn,
>
> Thanks for driving this effort to clean-up the Flink codebase!
>
> I like the idea to cleanup Flink codebase to avoid having two Sinks. On the
> other hand, I also thing the concern mentioned by Jing makes sense. In
> addition to thinking in terms of the rule proposed in FLIP-197
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-197%3A+API+stability+graduation+process
> >
> (which
> seems to focus mostly on the Flink developers' perspective), it might be
> useful to also think about the story from users' perspective and make sure
> their concerns can be addressed.
>
> Typically, by marking an API as deprecated, we are effectively telling
> users *they should start to migrate to the new API ASAP and we reserve the
> right to remove this API completely in the 1-2 releases*. Then it might be
> reasonable for users to ask questions such as:
>
> 1) Does SinkV2 public API provides all the functionalities needed to
> migrate my existing code from subclassing SinkFunction to subclassing
> SinkV2?
>
> 2) Is the amount of migration work reasonable? If yes, why is a class such
> as HBaseSinkFunction in Flink's own codebase still depending on
> SinkFunction? Maybe Flink developers should eat their own dog food and
> migrate (or deprecate) those classes in the Flink codebase first?
>
> Based on the discussion in this thread, I am not sure we have good answers
> to those questions yet. For the 1st question above, the answer is *no*
> because we already know that the SinkV2 is currently not able to support
> migration for JdbcSink. For the 2nd question above, we know there are many
> non-deprecated class in Flink codebase that are still depending on SinkV2.
> It is probably not nice to users if we tell them to migrate while we know
> there are existing issues that can prevent them from doing so easily.
>
> In order to move forward to deprecate SinkV2, I think it will be super
> useful to first migrate all the connectors managed by Flink community
> (including all externalized connectors) to use SinkV2. This work won't be
> wasted since we need to do this anyway. And it will also give us a chance
> to validate the capabilities of SinkV2 and fix bugs by ourselves as much as
> possible.
>
> What do you think?
>
> Best Regards,
> Dong
>
>
> On Wed, Jan 18, 2023 at 6:52 PM Martijn Visser 
> wrote:
>
> > Hi all,
> >
> > While discussing FLIP-281 [1] the discussion also turned to the
> > SinkFunction and the SinkV2 API. For a broader discussion I'm opening up
> a
> > separate discussion thread.
> >
> > As Yun Tang has mentioned in that discussion thread, it would be a good
> > time to deprecate the SinkFunction to avoid the need to introduce new
> > functions towards (to be) deprecated APIs. Jing rightfully mentioned that
> > it would be confusing to deprecate the SinkFunction if its successor is
> not
> > yet marked as 

Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-02-03 Thread Konstantin Knauf
Hi everyone,

if I am not mistaken of the sinks mentioned by Joao Kafka, Kinesis &
Kinesis already use the Sink2 API. How were those implemented without
exposing the ExecutionConfig?

Best,

Konstantin


Am Mi., 1. Feb. 2023 um 12:28 Uhr schrieb Lijie Wang <
wangdachui9...@gmail.com>:

> +1 for Option 2, if we can abstract an "ReadableExecutionConfig"
> interface(contains all is/get mehtod), and let ExecutionConfig implements
> ReadableExecutionConfig
>
> Best,
> Lijie
>
> João Boto  于2023年1月17日周二 20:39写道:
>
> > Hi all,
> >
> > As establish a read-only contract seems to be consensual approach,
> talking
> > to Lijie we saw two ways for doing this..
> >
> > Option 1: UnmodifiableExecutionConfig that extends ExecutionConfig (just
> > like the UnmodifiableConfiguration)
> > Pros:
> > - we have all the get methods
> > - don't need to change TypeInformation.createSerializer(ExecutionConfig
> > config)
> > Cons:
> > - we have to override 34 methods that modify things..
> > - new methods to ExecutionConfig will need to be override on
> > UnmodifiableExecutionConfig
> >
> >
> > Option 2: UnmodifiableExecutionConfig without extending ExecutionConfig.
> > Pros:
> > - new class so we don't need to override nothing.
> > - modifications to ExecutionConfig don't affect this class
> > Cons:
> > - need to change TypeInformation adding
> > createSerializer(UnmodifiableExecutionConfig config)
> > - need to add all get methods or only what needed (this could be a pros)
> >
> >
> > What option you think is better?
> >
> >
> >
> > On 2023/01/13 14:15:04 Joao Boto wrote:
> > > Hi flink devs,
> > >
> > > I'd like to start a discussion thread for FLIP-287[1].
> > > This comes from an offline discussion with @Lijie Wang, from
> FLIP-239[2]
> > > specially for the sink[3].
> > >
> > > Basically to expose the ExecutionConfig and JobId on
> SinkV2#InitContext.
> > > This  changes are necessary to correct migrate the current sinks to
> > SinkV2
> > > like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext
> > >
> > > Comments are welcome!
> > > Thanks,
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
> > > [2]
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271
> > > [3] https://issues.apache.org/jira/browse/FLINK-25421
> > >
> >
>


-- 
https://twitter.com/snntrable
https://github.com/knaufk


Re: [DISCUSS] Incorporate performance regression monitoring into routine process

2023-02-03 Thread David Anderson
+1

I don't have anything substantive to add, but I want to express how pleased
I am to see this conversation happening.

David

On Thu, Feb 2, 2023 at 5:09 AM Martijn Visser 
wrote:

> Hi all,
>
> +1 for the overall proposal. My feedback matches with what Matthias
> has already provided earlier.
>
> Best regards,
>
> Martijn
>
> Op di 31 jan. 2023 om 19:01 schreef Matthias Pohl
> :
> >
> > Thanks for the effort you put into this discussion, Yanfei.
> >
> > For me, the regression tests are similar to test instabilities. In this
> > sense, I agree with what Piotr and Jing said about it:
> > - It needs to be identified and fixed as soon as possible to avoid the
> > change affecting other contributions (e.g. hiding other regressions) and
> > making it harder to revert them.
> > - Contributors should be made aware of regressions that are caused by
> their
> > commits during the daily development. They should be enabled to
> > pro-actively react to instabilities/regressions that were caused by their
> > contributions. The available Slack channel and ML lists provide good
> tools
> > to enable contributors to be proactive.
> >
> > My experience is that contributors are quite proactive when it comes to
> > resolving test instabilities on master. Still, a dedicated role for
> > watching over CI is necessary to identify issues that slipped through.
> The
> > same applies to performance tests. Eventhough, I could imagine that for
> > performance tests, the pro-activeness of contributors is lower because
> > quite a few changes are just not affecting the performance. One idea  to
> > raise awareness might be to mention the performance tests in the PR
> > template (maybe next to some of the yes/no questions where a yes might
> > indicate that the performance is affected).
> >
> > About making monitoring of performance tests part of the release
> > management: Right now, watching CI for test instabilities and pinging
> > contributors on issues is already a task for release managers. Extending
> > this responsibility to also check the regression tests seems natural.
> >
> > Yanfei's write-up is a good proposal for general performance test
> > guidelines. It will help contributors and release managers alike. We
> could
> > integrate it into the release testing documentation [1]. I'm wondering
> > whether we would want to add a new Jira label to group these kinds of
> Jira
> > issues analogously to test-instabilities [2].
> >
> > But that said, I want to repeat the idea of organizing the release
> > management documentation in a way that we have not only general release
> > managers with a bunch of different tasks but dedicated roles within the
> > release management. Roles I could imagine based on our experience from
> the
> > past releases are:
> > - CI monitoring: Watching the #builds Slack channel for test
> instabilities
> > - Regression test monitoring: Watching the #flink-dev-benchmarks for
> > regressions
> > - Coordination: Announcements, documentation, organizational stuff (e.g.
> > release call)
> >
> > Having dedicated roles might help finding volunteers for the release
> > management. Volunteers could sign up for a dedicated role. Each role
> would
> > provide a clear set of tasks/responsibilities. They don't restrict us in
> > any way because multiple roles can be also fulfilled by a single person.
> > It's just about cutting the tasks into smaller chunks to make it less
> > overwhelming.
> >
> > Matthias
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/Release+Management+and+Feature+Plan
> > [2] https://cwiki.apache.org/confluence/display/FLINK/Flink+Jira+Process
> >
> > On Tue, Jan 31, 2023 at 5:16 PM Jing Ge 
> wrote:
> >
> > > Hi Yanfei,
> > >
> > > Thanks for your proposal and effort of driving it.
> > >
> > > I really like the document on how to deal with the performance
> regressions.
> > > This will coach more developers to be able to work with it. I would
> suggest
> > > that more developers will be aware of the performance regressions
> during
> > > the daily development, not only for release managers. The document
> like you
> > > drafted is the key to get them involved. It would be great if the
> document
> > > could be extended and cover all types of regressions, case by case. It
> will
> > > be a one-time effort and benefit for a long time.
> > >
> > > Best regards,
> > > Jing
> > >
> > > On Tue, Jan 31, 2023 at 12:10 PM Piotr Nowojski 
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > @Dong, I think the drawback of your proposal is that it wouldn't
> detect
> > > if
> > > > there is visible performance regression within benchmark noise. While
> > > this
> > > > should be do-able with large enough number of samples
> > > >
> > > > @Yanfei, Gathering medians would have basically the same problems
> with
> > > how
> > > > to deal with two consecutive performance changes. Another issue is
> that I
> > > > think it would be great to have an algorithm, where you can enter
> what's
> 

Re: Need help how to use Table API to join two Kafka streams

2023-02-03 Thread Amir Hossein Sharifzadeh
Thank you very much, Yuxia!

ssn stands for the social security number here (it was just an example but
it can be any field).

Best,
Amir


On Fri, Feb 3, 2023 at 5:45 AM yuxia  wrote:

> Hi, Amir.
> May look like using scala code:
>
> val t1 = tableEnv.executeSql("CREATE TEMPORARY TABLE s1 (id int, ssn
> string) WITH ('connector' = 'kafka', ...);
> val t2 = tableEnv.executeSql("CREATE TEMPORARY TABLE s2 (id int, ssn
> string) WITH ('connector' = 'kafka', ...);
>
> // you will need to rename the field to join, otherwise, it'll
> "org.apache.flink.table.api.ValidationException: Ambiguous column name:
> ssn".
> val t3 = tableEnv.sqlQuery("SELECT id, ssn as ssn1 FROM s2")
> val result = t1.join(t3).where($"ssn" === $"ssn1");
>
> Also, you can refer here for more detail[1].
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tableapi/#joins
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Amir Hossein Sharifzadeh" 
> 收件人: "dev" 
> 发送时间: 星期五, 2023年 2 月 03日 上午 4:45:08
> 主题: Need help how to use Table API to join two Kafka streams
>
> Hello,
>
> I have a Kafka producer and a Kafka consumer that produces and consumes
> multiple data respectively. You can think of two data sets here. Both
> datasets have a similar structure but carry different data.
>
> I want to implement a Table API to join two Kafka streams while I
> consume them. For example, data1.ssn==data2.ssn
>
> Constraints:
> I don't want to change my producer or use FlinkKafkaProducer.
>
> Thank you very much.
>
> Best,
> Amir
>


[RESULT][VOTE] FLIP-285: Refactoring LeaderElection to make Flink support multi-component leader election out-of-the-box

2023-02-03 Thread Matthias Pohl
The vote on FLIP-285 is done. The proposal is approved in [1] after the
discussion happened in [2].

There are 6 approving votes, 3 of which are binding:
* Chesnay Schepler (binding)
* Rui Fan (non-binding)
* Weijie Guo (non-binding)
* Conrad Jam (non-binding)
* Yang Wang (binding)
* David Morávek (binding)

Nobody rejected the proposal. I'm gonna go ahead and work on this issue for
1.18.

Best,
Matthias

[1] https://lists.apache.org/thread/hdkz4v5phqwbr8b8971kqot31om8osfq
[2] https://lists.apache.org/thread/qrl881wykob3jnmzsof5ho8b9fgkklpt

-- 

[image: Aiven] 

*Matthias Pohl*
Opensource Software Engineer, *Aiven*
matthias.p...@aiven.io|  +49 170 9869525
aiven.io    |   
     
*Aiven Deutschland GmbH*
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B


Re: [DISCUSS] FLIP-276: Data Consistency of Streaming and Batch ETL in Flink and Table Store

2023-02-03 Thread Piotr Nowojski
Hi Shammon,

Thanks for pushing the topic further. I'm not sure how this new proposal is
supposed to be working? How should timestamp barrier interplay with event
time and watermarks? Or is timestamp barrier supposed to completely replace
watermarks?

> stateful and temporal operators should align them (records) according to
their timestamp field.

Are you proposing that all of the inputs to stateful operators would have
to be sorted?

> There're three states in a table for specific transaction : PreCommit,
Commit and Snapshot

Can you explain why do you need those 3 states? Why can committed records
be rolled back?

>> 10. Have you considered proposing a general consistency mechanism instead
>> of restricting it to TableStore+ETL graphs? For example, it seems to me
to
>> be possible and valuable to define instead the contract that
sources/sinks
>> need to implement in order to participate in globally consistent
snapshots.
>
> A general consistency mechanism is cool! In my mind, the overall
> `consistency system` consists of three components: Streaming & Batch ETL,
> Streaming & Batch Storage and MetaService. MetaService is decoupled from
> Storage Layer, but it stores consistency information in persistent
storage.
> It can be started as an independent node or a component in a large Flink
> cluster. In the FLIP we use TableStore as the Storage Layer. As you
> mentioned, we plan to implement specific source and sink on the TableStore
> in the first phase, and may consider other storage in the future

I'm not sure if I follow. Generally speaking, why do we need MetaService at
all? Why can we only support writes to and reads from TableStore, and not
any source/sink that implements some specific interface?

Best,
Piotrek

niedz., 29 sty 2023 o 12:11 Shammon FY  napisał(a):

> Hi @Vicky
>
> Thank you for your suggestions about consistency and they're very nice to
> me!
>
> I have updated the examples and consistency types[1] in FLIP. In general, I
> regard the Timestamp Barrier processing as a transaction and divide the
> data consistency supported in FLIP into three types
>
> 1. Read Uncommitted: Read data from tables even when a transaction is not
> committed.
> 2. Read Committed: Read data from tables according to the committed
> transaction.
> 3. Repeatable Read: Read data from tables according to the committed
> transaction in snapshots.
>
> You can get more information from the updated FLIP. Looking forward to your
> feedback, THX
>
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store#FLIP276:DataConsistencyofStreamingandBatchETLinFlinkandTableStore-DataConsistencyType
>
> Best,
> Shammon
>
>
> On Sat, Jan 28, 2023 at 4:42 AM Vasiliki Papavasileiou
>  wrote:
>
> > Hi Shammon,
> >
> >
> > Thank you for opening this FLIP which is very interesting and such an
> > important feature to add to the Flink ecosystem. I have a couple of
> > suggestions/questions:
> >
> >
> >
> >-
> >
> >Consistency is a very broad term with different meanings. There are
> many
> >variations between the two extremes of weak and strong consistency
> that
> >tradeoff latency for consistency. https://jepsen.io/consistency It
> > would
> >be great if we could devise an approach that allows the user to choose
> >which consistency level they want to use for a query.
> >
> >
> > Example: In your figure where you have a DAG, assume a user queries only
> > Table1 for a specific key. Then, a failure happens and the table restores
> > from a checkpoint. The user issues the same query, looking up the same
> key.
> > What value does she see? With monotonic-reads, the system guarantees that
> > she will only see the same or newer values but not older, hence will not
> > experience time-travel. This is a very useful property for a system to
> have
> > albeit it is at the weaker-end of consistency guarantees. But it is a
> good
> > stepping stone.
> >
> >
> > Another example, assume the user queries Table1 for key K1 and gets the
> > value V11. Then, she queries Table2 that is derived from Table1 for the
> > same key, K1, that returns value V21. What is the relationship between
> V21
> > and V11? Is V21 derived from V11 or can it be an older value V1 (the
> > previous value of K1)? What if value V21 is not yet in table Table2? What
> > should she see when she queries Table1? Should she see the key V11 or
> not?
> > Should the requirement be that a record is not visible in any of the
> tables
> > in a DAG unless it is available in all of them?
> >
> >
> >
> >-
> >
> >It would we good to have a set of examples with consistency anomalies
> >that can happen (like the examples above) and what consistency levels
> we
> >want the system to offer to prevent them.
> >Moreover, for each such example, it would be good to have a
> description
> >of how the approach (Timestamp Barriers) will work in practice to
> > prevent
> 

Re: [VOTE] FLIP-285: Refactoring LeaderElection to make Flink support multi-component leader election out-of-the-box

2023-02-03 Thread Matthias Pohl
Thanks for your participation. I'm closing this vote and will announce the
results in a separate email.

Matthias

On Fri, Feb 3, 2023 at 3:26 PM David Morávek  wrote:

> Thanks for the detailed FLIP, Matthias; this will simplify the HA code-base
> significantly.
>
> +1 (binding)
>
> Best,
> D.
>
> On Tue, Jan 31, 2023 at 5:22 AM Yang Wang  wrote:
>
> > +1 (Binding)
> >
> > Best,
> > Yang
> >
> > ConradJam  于2023年1月31日周二 12:09写道:
> >
> > > +1 non-binding
> > >
> > > Matthias Pohl  于2023年1月25日周三 17:34写道:
> > >
> > > > Hi everyone,
> > > > After the discussion thread [1] on FLIP-285 [2] didn't bring up any
> new
> > > > items, I want to start voting on FLIP-285. This FLIP will not only
> > align
> > > > the leader election code base again through FLINK-26522 [3]. I also
> > plan
> > > to
> > > > improve the test coverage for the leader election as part of this
> > change
> > > > (covered in FLINK-30338 [4]).
> > > >
> > > > The vote will remain open until at least Jan 30th (at least 72 hours)
> > > > unless there are some objections or insufficient votes.
> > > >
> > > > Best,
> > > > Matthias
> > > >
> > > > [1] https://lists.apache.org/thread/qrl881wykob3jnmzsof5ho8b9fgkklpt
> > > > [2]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-285%3A+Refactoring+LeaderElection+to+make+Flink+support+multi-component+leader+election+out-of-the-box
> > > > [3] https://issues.apache.org/jira/browse/FLINK-26522
> > > > [4] https://issues.apache.org/jira/browse/FLINK-30338
> > > >
> > > > --
> > > >
> > > > [image: Aiven]
> > > >
> > > > Matthias Pohl
> > > >
> > > > Software Engineer, Aiven
> > > >
> > > > matthias.p...@aiven.io 
> > > >
> > > > aiven.io    |
> > > >  <
> > > > https://www.facebook.com/aivencloud/>
> > > > 
> > > > <
> > > https://twitter.com/aiven_io>
> > > > 
> > > >
> > > > Aiven Deutschland GmbH
> > > >
> > > > Immanuelkirchstraße 26, 10405 Berlin
> > > >
> > > > Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> > > >
> > > > Amtsgericht Charlottenburg, HRB 209739 B
> > > >
> > >
> > >
> > > --
> > > Best
> > >
> > > ConradJam
> > >
> >
>


Re: [VOTE] FLIP-285: Refactoring LeaderElection to make Flink support multi-component leader election out-of-the-box

2023-02-03 Thread David Morávek
Thanks for the detailed FLIP, Matthias; this will simplify the HA code-base
significantly.

+1 (binding)

Best,
D.

On Tue, Jan 31, 2023 at 5:22 AM Yang Wang  wrote:

> +1 (Binding)
>
> Best,
> Yang
>
> ConradJam  于2023年1月31日周二 12:09写道:
>
> > +1 non-binding
> >
> > Matthias Pohl  于2023年1月25日周三 17:34写道:
> >
> > > Hi everyone,
> > > After the discussion thread [1] on FLIP-285 [2] didn't bring up any new
> > > items, I want to start voting on FLIP-285. This FLIP will not only
> align
> > > the leader election code base again through FLINK-26522 [3]. I also
> plan
> > to
> > > improve the test coverage for the leader election as part of this
> change
> > > (covered in FLINK-30338 [4]).
> > >
> > > The vote will remain open until at least Jan 30th (at least 72 hours)
> > > unless there are some objections or insufficient votes.
> > >
> > > Best,
> > > Matthias
> > >
> > > [1] https://lists.apache.org/thread/qrl881wykob3jnmzsof5ho8b9fgkklpt
> > > [2]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-285%3A+Refactoring+LeaderElection+to+make+Flink+support+multi-component+leader+election+out-of-the-box
> > > [3] https://issues.apache.org/jira/browse/FLINK-26522
> > > [4] https://issues.apache.org/jira/browse/FLINK-30338
> > >
> > > --
> > >
> > > [image: Aiven]
> > >
> > > Matthias Pohl
> > >
> > > Software Engineer, Aiven
> > >
> > > matthias.p...@aiven.io 
> > >
> > > aiven.io    |
> > >  <
> > > https://www.facebook.com/aivencloud/>
> > > 
> > > <
> > https://twitter.com/aiven_io>
> > > 
> > >
> > > Aiven Deutschland GmbH
> > >
> > > Immanuelkirchstraße 26, 10405 Berlin
> > >
> > > Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> > >
> > > Amtsgericht Charlottenburg, HRB 209739 B
> > >
> >
> >
> > --
> > Best
> >
> > ConradJam
> >
>


[jira] [Created] (FLINK-30898) Do not include and build examples in operator image

2023-02-03 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-30898:
--

 Summary: Do not include and build examples in operator image
 Key: FLINK-30898
 URL: https://issues.apache.org/jira/browse/FLINK-30898
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.4.0
Reporter: Gyula Fora
 Fix For: kubernetes-operator-1.4.0


The docker build has slowed down substantially over time. We include many 
things in the image that are not necessary.

We should not include examples at all.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30897) Avoid timeouts in JUnit tests

2023-02-03 Thread lincoln lee (Jira)
lincoln lee created FLINK-30897:
---

 Summary: Avoid timeouts in JUnit tests
 Key: FLINK-30897
 URL: https://issues.apache.org/jira/browse/FLINK-30897
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Reporter: lincoln lee


As our [testing 
guideline|https://flink.apache.org/contributing/code-style-and-quality-common.html#avoid-timeouts-in-junit-tests]
 says we should 'Avoid timeouts in JUnit tests' but rather depend on the global 
timeout in Azure. There're 10 itcases throughout the project that use the 
'Timeout Rule' to set local timeouts. We need to check if we can change this 
dependency one by one

List of related test classes:

{code}

flink-end-to-end-tests-common-kafka  (1 usage found)
            org.apache.flink.tests.util.kafka  (1 usage found)
                SQLClientSchemaRegistryITCase.java  (1 usage found)
                    78 @ClassRule public static final Timeout TIMEOUT = new 
Timeout(10, TimeUnit.MINUTES);
        flink-glue-schema-registry-avro-test_2.12  (1 usage found)
            org.apache.flink.glue.schema.registry.test  (1 usage found)
                GlueSchemaRegistryAvroKinesisITCase.java  (1 usage found)
                    74 @ClassRule public static final Timeout TIMEOUT = new 
Timeout(10, TimeUnit.MINUTES);
        flink-glue-schema-registry-json-test  (1 usage found)
            org.apache.flink.glue.schema.registry.test.json  (1 usage found)
                GlueSchemaRegistryJsonKinesisITCase.java  (1 usage found)
                    68 @ClassRule public static final Timeout TIMEOUT = new 
Timeout(10, TimeUnit.MINUTES);
        flink-runtime  (1 usage found)
            org.apache.flink.runtime.io.disk  (1 usage found)
                BatchShuffleReadBufferPoolTest.java  (1 usage found)
                    41 @Rule public Timeout timeout = new Timeout(60, 
TimeUnit.SECONDS);
        flink-streaming-java  (1 usage found)
            org.apache.flink.streaming.api.operators.async  (1 usage found)
                AsyncWaitOperatorTest.java  (1 usage found)
                    117 @Rule public Timeout timeoutRule = new Timeout(100, 
TimeUnit.SECONDS);
        flink-tests  (5 usages found)
            org.apache.flink.runtime.operators.lifecycle  (3 usages found)
                BoundedSourceITCase.java  (1 usage found)
                    75 @Rule public Timeout timeoutRule = new Timeout(10, 
TimeUnit.MINUTES);
                PartiallyFinishedSourcesITCase.java  (1 usage found)
                    79 @Rule public Timeout timeoutRule = new Timeout(10, 
TimeUnit.MINUTES);
                StopWithSavepointITCase.java  (1 usage found)
                    103 @Rule public Timeout timeoutRule = new Timeout(10, 
TimeUnit.MINUTES);
            org.apache.flink.test.runtime  (2 usages found)
                JoinDeadlockITCase.java  (1 usage found)
                    39 @Rule public Timeout globalTimeout = new Timeout(120 * 
1000); // Set timeout for deadlocks
                SelfJoinDeadlockITCase.java  (1 usage found)
                    46 @Rule public Timeout globalTimeout = new Timeout(120 * 
1000); // Set timeout for deadlocks

{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-291: Externalized Declarative Resource Management

2023-02-03 Thread Matthias Pohl
Thanks David for creating this FLIP. It sounds promising and useful to
have. Here are some thoughts from my side (some of them might be rather a
follow-up and not necessarily part of this FLIP):
- I'm wondering whether it makes sense to add some kind of resource ID to
the REST API. This would give Flink a tool to verify the PATCH request of
the external system in a compare-and-set kind of manner. AFAIU, the process
requires the external system to retrieve the resource requirements first
(to retrieve the vertex IDs). A resource ID  would be sent along as a
unique identifier for the provided setup. It's essentially the version ID
of the currently deployed resource requirement configuration. Flink doesn't
know whether the external system would use the provided information in some
way to derive a new set of resource requirements for this job. The
subsequent PATCH request with updated resource requirements would include
the previously retrieved resource ID . The PATCH call would fail if
there was a concurrent PATCH call in between indicating to the external
system that the resource requirements were concurrently updated.
- How often do we allow resource requirements to be changed? That question
might make my previous comment on the resource ID obsolete because we could
just make any PATCH call fail if there was a resource requirement update
within a certain time frame before the request. But such a time period is
something we might want to make configurable then, I guess.
- Versioning the JobGraph in the JobGraphStore rather than overwriting it
might be an idea. This would enable us to provide resource requirement
changes in the UI or through the REST API. It is related to a problem
around keeping track of the exception history within the AdaptiveScheduler
and also having to consider multiple versions of a JobGraph. But for that
one, we use the ExecutionGraphInfoStore right now.
- Updating the JobGraph in the JobGraphStore makes sense. I'm just
wondering whether we bundle two things together that are actually separate:
The business logic and the execution configuration (the resource
requirements). I'm aware that this is not a flaw of the current FLIP but
rather something that was not necessary to address in the past because the
JobGraph was kind of static. I don't remember whether that was already
discussed while working on the AdaptiveScheduler for FLIP-160 [1]. Maybe,
I'm missing some functionality here that requires us to have everything in
one place. But it feels like updating the entire JobGraph which could be
actually a "config change" is not reasonable. ...also considering the
amount of data that can be stored in a ConfigMap/ZooKeeper node if
versioning the resource requirement change as proposed in my previous item
is an option for us.
- Updating the JobGraphStore means adding more requests to the HA backend
API. There were some concerns shared in the discussion thread [2] for
FLIP-270 [3] on pressuring the k8s API server in the past with too many
calls. Eventhough, it's more likely to be caused by checkpointing, I still
wanted to bring it up. We're working on a standardized performance test to
prepare going forward with FLIP-270 [3] right now.

Best,
Matthias

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Adaptive+Scheduler
[2] https://lists.apache.org/thread/bm6rmxxk6fbrqfsgz71gvso58950d4mj
[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints

On Fri, Feb 3, 2023 at 10:31 AM ConradJam  wrote:

> Hi David:
>
> Thank you for drive this flip, which helps less flink shutdown time
>
> for this flip, I would like to make a few idea on share
>
>
>- when the number of "slots" is insufficient, can we can stop users
>rescaling or throw something to tell user "less avaliable slots to
> upgrade,
>please checkout your alivalbe slots" ? Or we could have a request
>switch(true/false) to allow this behavior
>
>
>- when user upgrade job-vertx-parallelism . I want to have an interface
>to query the current update parallel execution status, so that the user
> or
>program can understand the current status
>- I want to have an interface to query the current update parallelism
>execution status. This also helps similar to *[1] Flink K8S Operator*
>management
>
>
> {
>   status: Failed
>   reason: "less avaliable slots to upgrade, please checkout your alivalbe
> slots"
> }
>
>
>
>- *Pending*: this job now is join the upgrade queue,it will be update
>later
>- *Rescaling*: job now is rescaling,wait it finish
>- *Finished*: finish do it
>- *Failed* : something have wrong,so this job is not alivable upgrade
>
> I want to supplement my above content in flip, what do you think ?
>
>
>1.
>https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/
>
>
> David Morávek  于2023年2月3日周五 16:42写道:
>
> > Hi everyone,
> >
> > This FLIP [1] introduces a new REST API for declaring resource
> 

[jira] [Created] (FLINK-30896) Reduce usage of CatalogViewImpl in planner

2023-02-03 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-30896:
---

 Summary: Reduce usage of CatalogViewImpl in planner
 Key: FLINK-30896
 URL: https://issues.apache.org/jira/browse/FLINK-30896
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Sergey Nuyanzin


Most of the work was done under 
https://issues.apache.org/jira/browse/FLINK-21801

 

However there are still some usages of {{CatalogViewImpl}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Need help how to use Table API to join two Kafka streams

2023-02-03 Thread yuxia
Hi, Amir.
May look like using scala code:

val t1 = tableEnv.executeSql("CREATE TEMPORARY TABLE s1 (id int, ssn string) 
WITH ('connector' = 'kafka', ...);
val t2 = tableEnv.executeSql("CREATE TEMPORARY TABLE s2 (id int, ssn string) 
WITH ('connector' = 'kafka', ...);

// you will need to rename the field to join, otherwise, it'll 
"org.apache.flink.table.api.ValidationException: Ambiguous column name: ssn".
val t3 = tableEnv.sqlQuery("SELECT id, ssn as ssn1 FROM s2")
val result = t1.join(t3).where($"ssn" === $"ssn1");

Also, you can refer here for more detail[1].
[1]  
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tableapi/#joins

Best regards,
Yuxia

- 原始邮件 -
发件人: "Amir Hossein Sharifzadeh" 
收件人: "dev" 
发送时间: 星期五, 2023年 2 月 03日 上午 4:45:08
主题: Need help how to use Table API to join two Kafka streams

Hello,

I have a Kafka producer and a Kafka consumer that produces and consumes
multiple data respectively. You can think of two data sets here. Both
datasets have a similar structure but carry different data.

I want to implement a Table API to join two Kafka streams while I
consume them. For example, data1.ssn==data2.ssn

Constraints:
I don't want to change my producer or use FlinkKafkaProducer.

Thank you very much.

Best,
Amir


[jira] [Created] (FLINK-30895) SlotSharingSlotAllocator may waste slots

2023-02-03 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-30895:


 Summary: SlotSharingSlotAllocator may waste slots
 Key: FLINK-30895
 URL: https://issues.apache.org/jira/browse/FLINK-30895
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.16.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0, 1.17.1


The allocated evenly distributes slots across slot sharing groups independent 
of how many slots the vertices in that group actually need.

This can cause slots to be unused.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-291: Externalized Declarative Resource Management

2023-02-03 Thread ConradJam
Hi David:

Thank you for drive this flip, which helps less flink shutdown time

for this flip, I would like to make a few idea on share


   - when the number of "slots" is insufficient, can we can stop users
   rescaling or throw something to tell user "less avaliable slots to upgrade,
   please checkout your alivalbe slots" ? Or we could have a request
   switch(true/false) to allow this behavior


   - when user upgrade job-vertx-parallelism . I want to have an interface
   to query the current update parallel execution status, so that the user or
   program can understand the current status
   - I want to have an interface to query the current update parallelism
   execution status. This also helps similar to *[1] Flink K8S Operator*
   management


{
  status: Failed
  reason: "less avaliable slots to upgrade, please checkout your alivalbe slots"
}



   - *Pending*: this job now is join the upgrade queue,it will be update
   later
   - *Rescaling*: job now is rescaling,wait it finish
   - *Finished*: finish do it
   - *Failed* : something have wrong,so this job is not alivable upgrade

I want to supplement my above content in flip, what do you think ?


   1.
   https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/


David Morávek  于2023年2月3日周五 16:42写道:

> Hi everyone,
>
> This FLIP [1] introduces a new REST API for declaring resource requirements
> for the Adaptive Scheduler. There seems to be a clear need for this API
> based on the discussion on the "Reworking the Rescale API" [2] thread.
>
> Before we get started, this work is heavily based on the prototype [3]
> created by Till Rohrmann, and the FLIP is being published with his consent.
> Big shoutout to him!
>
> Last and not least, thanks to Chesnay and Roman for the initial reviews and
> discussions.
>
> The best start would be watching a short demo [4] that I've recorded, which
> illustrates newly added capabilities (rescaling the running job, handing
> back resources to the RM, and session cluster support).
>
> The intuition behind the FLIP is being able to define resource requirements
> ("resource boundaries") externally that the AdaptiveScheduler can navigate
> within. This is a building block for higher-level efforts such as an
> external Autoscaler. The natural extension of this work would be to allow
> to specify per-vertex ResourceProfiles.
>
> Looking forward to your thoughts; any feedback is appreciated!
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-291%3A+Externalized+Declarative+Resource+Management
> [2] https://lists.apache.org/thread/2f7dgr88xtbmsohtr0f6wmsvw8sw04f5
> [3] https://github.com/tillrohrmann/flink/tree/autoscaling
> [4] https://drive.google.com/file/d/1Vp8W-7Zk_iKXPTAiBT-eLPmCMd_I57Ty/view
>
> Best,
> D.
>


-- 
Best

ConradJam


[jira] [Created] (FLINK-30894) Introduce Serializer to serialize internal data structure

2023-02-03 Thread Feng Wang (Jira)
Feng Wang created FLINK-30894:
-

 Summary: Introduce Serializer to serialize internal data structure
 Key: FLINK-30894
 URL: https://issues.apache.org/jira/browse/FLINK-30894
 Project: Flink
  Issue Type: New Feature
  Components: Table Store
Reporter: Feng Wang
Assignee: Feng Wang
 Fix For: table-store-0.4.0


Introduce Serializer to store itself. Unlike Flink's TypeSerializer, only a few 
methods are needed here.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Reworking the Rescale API

2023-02-03 Thread Chesnay Schepler

My primary concern here is to be able to rescale with upfront reservation of 
resources before restarting the job, so the job doesn't get stuck in case of 
resource constraints.


Not sure I follow. The AS only rescales when it has already acquired the slots 
that it needs.

> This is a blocker from my side. Why do we have that restriction?

We just didn't bother fixing it initially. It should be easy to fix.

On 02/02/2023 18:29, Maximilian Michels wrote:

I fully agree that in-place scaling is a much harder problem which is
out of the scope for now. My primary concern here is to be able to
rescale with upfront reservation of resources before restarting the
job, so the job doesn't get stuck in case of resource constraints.


Unused slots: If the max parallelism for slot sharing groups is not equal, 
slots offered to Adaptive Scheduler might be unused.

From: 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/elastic_scaling/#limitations-1

This is a blocker from my side. Why do we have that restriction?

On Thu, Feb 2, 2023 at 5:03 PM Chesnay Schepler  wrote:

  > If I understand correctly, the adaptive scheduler currently does a
full job restart. Is there any work planned to enable in-place rescaling
with the adaptive scheduler?

Nothing concrete. Sure, it's on a wishlist, but it'd require significant
changes to how the runtime works.
Rescaling stateful operators requires keygroups to be redistributed,
you'd need to be able to change task edges dynamically, roll-back to a
checkpoint without restarting tasks, ...

It's less of a scheduler thing actually.

An earlier step to that would be to allow recovery from an error without
restarting all tasks, which would benefit all schedulers.
But again bit of a moonshot.

  > How well has the adaptive scheduler been tested in production? If we
are intending to use it for rescale operations, I'm a bit concerned
those jobs might show different behavior due to the scheduling than jobs
started with the default scheduler.

I don't think we got a lot of feedback so far.
Outside of the limitations listed on the elastic scaling page (which I
believe we'll address in due time) I'm not aware of any problems.
We haven't run into any issues internally.

On 02/02/2023 12:44, Maximilian Michels wrote:

+1 on improving the scheduler docs.


They never shared a base class since day 1. Are you maybe mixing up the 
AdaptiveScheduler and AdaptiveBatchScheduler?

@Chesnay: Indeed, I had mixed this up. DefaultScheduler and
AdaptiveScheduler only share the SchedulerNG interface while the
DefaultScheduler and the AdaptiveBatchScheduler share a subset of the
code. Too many schedulers :)

Thanks for clarifying the current and the intended feature set of the
adaptive scheduler!

How well has the adaptive scheduler been tested in production? If we
are intending to use it for rescale operations, I'm a bit concerned
those jobs might show different behavior due to the scheduling than
jobs started with the default scheduler.

If I understand correctly, the adaptive scheduler currently does a
full job restart. Is there any work planned to enable in-place
rescaling with the adaptive scheduler?


@max:
- when user repartition, we still need to restart the job, can we try to
do this part of the work internally instead of externally, as
*@konstantin* said only trigger rescaling when the checkpoint or
retain-checkpoint is completed operations to minimize reprocessing

@ConradJam: I'm not sure I understand your question. Do you mean when
the partition strategy changes between operators? That shouldn't be
the case for Rescale (except maybe converting ForwardPartitioner to
RescalePartitioner). A more advanced rescale API could allow user
control over this but for now I think it would only support adjusting
parallelism of vertices.

-Max

On Thu, Feb 2, 2023 at 6:44 AM weijie guo  wrote:

Hi David,

Sorry I'm late to join discuss.

+1 for having a more structure doc about scheduler ecosystem and I can help to 
fill in the details about batch part.

Best regards,

Weijie



David Morávek  于2023年2月1日周三 22:38写道:

It makes sense to give the whole "scheduler ecosystem," not just the
adaptive scheduler, a little bit more structure in the docs. We already
have 4 different schedulers (Default, Adaptive, AdaptiveBatch,
AdaptiveBatchSpeculative), and it becomes quite confusing since the details
are scattered around the docs. Maybe having a "Job Schedulers" subpage, the
same way as we have for "Resource Providers" could do the trick.

I should be able to fill in the details about the streaming ones, but I
will probably need some help with the batch ones.

As for the first FLIP, it's already prepared and we should be able to
publish it until Friday.

Best,
D.


On Wed, Feb 1, 2023 at 9:56 AM Gyula Fóra  wrote:


Chesnay, David:

Thank you guys for the extra information. We were clearly missing some
context here around the scheduler related efforts and the currently

Re: [DISCUSS] FLIP-278: Hybrid Source Connector

2023-02-03 Thread Ran Tao
Hi, all. i have updated flip-278[1]. I think all problems or comments has
been addressed.

1.about option prefix, we use identifiers.
2.table api implementation and demo
3.about switched dynamic position (hybrid source use it auto switch from
previous to next source)

More details can be found at draft pr[2], it works well.

1.https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235836225
2.https://github.com/apache/flink/pull/21841

Ran Tao  于2022年12月19日周一 16:16写道:

> a mistake,
> childSources.get(sourceIndex).setStartTimetamp(switchedTimestamp);
>
> Ran Tao  于2022年12月19日周一 16:10写道:
>
>> Hi, John. thanks for your comments.
>> About question-2 the "handoff" is using for switching next source
>> seamlessly. but it's an option. Not every hybrid source job need to using
>> this mode.
>>
>> The hybrid source sql or table need to implement two ways like DataStream
>> api below.  One for fixed position, user can specify the earliest, latest
>> or specific-offsets etc.
>> And the second way is that user can also specify the timestamp to let
>> second source using timestamp to consume the kafka data (no need to specify
>> earliest, latest or specific-offsets, flink do this conversion).
>>
>>  * A simple example with FileSource and KafkaSource with fixed Kafka
>> start position:
>>  * {@code
>>  * FileSource fileSource =
>>  *   FileSource.forRecordStreamFormat(new TextLineInputFormat(),
>> Path.fromLocalFile(testDir)).build();
>>  * KafkaSource kafkaSource =
>>  *   KafkaSource.builder()
>>  *   .setBootstrapServers("localhost:9092")
>>  *   .setGroupId("MyGroup")
>>  *   .setTopics(Arrays.asList("quickstart-events"))
>>  *   .setDeserializer(
>>  *
>> KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
>>  *   .setStartingOffsets(OffsetsInitializer.earliest())
>>  *   .build();
>>  * HybridSource hybridSource =
>>  *   HybridSource.builder(fileSource)
>>  *   .addSource(kafkaSource)
>>  *   .build();
>>  * }
>>  *
>>  * A more complex example with Kafka start position derived from
>> previous source:
>>  *
>>  * {@code
>>  * HybridSource hybridSource =
>>  * HybridSource.builder(fileSource)
>>  * .addSource(
>>  * switchContext -> {
>>  *   StaticFileSplitEnumerator previousEnumerator =
>>  *   switchContext.getPreviousEnumerator();
>>  *   // how to get timestamp depends on specific enumerator
>>  *   long timestamp = previousEnumerator.getEndTimestamp();
>>  *   OffsetsInitializer offsets =
>>  *   OffsetsInitializer.timestamp(timestamp);
>>  *   KafkaSource kafkaSource =
>>  *   KafkaSource.builder()
>>  *   .setBootstrapServers("localhost:9092")
>>  *   .setGroupId("MyGroup")
>>  *   .setTopics(Arrays.asList("quickstart-events"))
>>  *   .setDeserializer(
>>  *
>> KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
>>  *   .setStartingOffsets(offsets)
>>  *   .build();
>>  *   return kafkaSource;
>>  * },
>>  * Boundedness.CONTINUOUS_UNBOUNDED)
>>  * .build();
>>  * }
>>
>> Currently flink SplitEnumerator interface not expose the
>> getEndTimestamp().  I think if we want to implement the "handoff" way we
>> need to let SplitEnumerator expose this method.
>> Then the question is if we get the previous endTimestamp, how to set it
>> back?  we can't build KafkaSource instance because hybrid is a common
>> implementation.
>> I think we need add a method for example startTimestamp() in new Source.
>> then we can implement this:
>>
>> Switched-start-position demo:
>>
>> HybridSource.HybridSourceBuilder builder =
>> HybridSource.builder(childSources.get(0));
>> for (int i = 1; i < childSources.size(); i++) {
>> final int sourceIndex = i;
>> Boundedness boundedness =
>> childSources.get(sourceIndex).getBoundedness();
>> builder.addSource(
>> switchContext -> {
>> SplitEnumerator previousEnumerator =
>>
>> switchContext.getPreviousEnumerator();
>> // how to pass to kafka or other connector ?
>> We add a method in new
>> // source api like startTimestamp();
>> long switchedTimestamp =
>> previousEnumerator.getEndTimestamp();
>> childSources.setStartTimestamp(
>> switchedTimestamp);
>> return childSources.get(sourceIndex);
>> },
>> boundedness);
>> }
>> hybridSource = builder.build();
>>
>> e.g. if kafka is end source. 

Re: [DISCUSS] FLIP-278: Hybrid Source Connector

2023-02-03 Thread Ran Tao
Hi, Martijn. i have updated the flip about table api & switched start
timestamp.
thanks.

Martijn Visser  于2022年12月16日周五 16:59写道:

> Hi Ran,
>
> For completeness, this is a new thread that was already previously started
> at https://lists.apache.org/thread/xptn2ddzj34q9f5vtbfb62lsybmvcwjq. I'm
> linking them because I think Timo's comments are relevant to be kept with
> this discussion thread.
>
> I agree with Timo's comments from there that having an index key isn't the
> best option, I would rather have an identifier.
>
> I do wonder how this would work when you want to specify sources from a
> catalog: could you elaborate on that?
>
> What I'm also missing in the FLIP is an example of how to specify the
> starting offset from Kafka. In the DataStream API, there
> is OffsetsInitializer.timestamp(switchTimestamp + 1) but how would you
> specify that in the SQL landscape?
>
> Last but not least: your examples are all SQL only. How do you propose that
> this works in the Table API?
>
> Best regards,
>
> Martijn
>
> On Thu, Dec 15, 2022 at 9:16 AM Ran Tao  wrote:
>
> > Fyi.
> >
> > This flip using index as child source option prefix because we may use
> the
> > same connector as hybrid child sources.
> > e.g.
> >
> > create table hybrid_source(
> >  f0 varchar,
> >  f1 varchar,
> >  f2 bigint
> > ) with(
> >  'connector'='hybrid',
> >  'sources'='filesystem,filesystem',
> >  '0.path' = '/tmp/a.csv',
> >  '0.format' = 'csv',
> >  '1.path' = '/tmp/b.csv',
> >  '1.format' = 'csv'"
> > );
> >
> > In this case, we must distinguish the format and path option belonging to
> > which filesystem connector. But as Timo says, it's not clear. He suggest
> > another way like this:
> >
> > CREATE TABLE hybrid_source WITH (
> >'sources'='historical;realtime',   -- Config option of type string
> list
> >'historical.connector' = 'filesystem',
> >'historical.path' = '/tmp/a.csv',
> >'historcal.format' = 'csv',
> >'realtime.path' = '/tmp/b.csv',
> >'realtime.format' = 'csv'"
> > )
> >
> > `sources` option is user-custom name instead of the concrete connector
> > type. And we use this user-custom name as prefix, and using
> > prefix.connector to call concrete connector impl.
> >
>


-- 
Best Regards,
Ran Tao
https://github.com/chucheng92


[DISCUSS] FLIP-291: Externalized Declarative Resource Management

2023-02-03 Thread David Morávek
Hi everyone,

This FLIP [1] introduces a new REST API for declaring resource requirements
for the Adaptive Scheduler. There seems to be a clear need for this API
based on the discussion on the "Reworking the Rescale API" [2] thread.

Before we get started, this work is heavily based on the prototype [3]
created by Till Rohrmann, and the FLIP is being published with his consent.
Big shoutout to him!

Last and not least, thanks to Chesnay and Roman for the initial reviews and
discussions.

The best start would be watching a short demo [4] that I've recorded, which
illustrates newly added capabilities (rescaling the running job, handing
back resources to the RM, and session cluster support).

The intuition behind the FLIP is being able to define resource requirements
("resource boundaries") externally that the AdaptiveScheduler can navigate
within. This is a building block for higher-level efforts such as an
external Autoscaler. The natural extension of this work would be to allow
to specify per-vertex ResourceProfiles.

Looking forward to your thoughts; any feedback is appreciated!

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-291%3A+Externalized+Declarative+Resource+Management
[2] https://lists.apache.org/thread/2f7dgr88xtbmsohtr0f6wmsvw8sw04f5
[3] https://github.com/tillrohrmann/flink/tree/autoscaling
[4] https://drive.google.com/file/d/1Vp8W-7Zk_iKXPTAiBT-eLPmCMd_I57Ty/view

Best,
D.