Re: [DISCUSS] Allow source readers extending SourceReaderBase to override numRecordsIn report logic

2023-01-02 Thread Qingsheng Ren
Thanks for starting the discussion, Wencong!

+1 for the proposal. We can't promise that the definition of "record" in
all source implementations totally matches with SourceReaderBase. For
example CDC connector source [1] extracts records in the RecordEmitter
instead, so the predefined "numRecordsIn" metric in SourceReaderBase could
not reflect the actual throughput.

[1]
https://github.com/ververica/flink-cdc-connectors/blob/release-2.3.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java

Best,
Qingsheng
Ververica (Alibaba)

On Tue, Jan 3, 2023 at 11:50 AM Wencong Liu  wrote:

> Hi devs,
>
>
> I'd like to start a discussion about FLINK-30234: SourceReaderBase should
> provide an option to disable numRecordsIn metric registration [1].
>
>
> As the FLINK-302345 describes, the numRecordsIn metric is pre-registered
> for all sources in SourceReaderBase currently. Considering different
> implementation of source reader, the definition of "record" might differ
> from the one we use in SourceReaderBase, hence numRecordsIn might be
> inaccurate.
>
>
> We could introduce an public option in SourceReaderOptions used in
> SourceReaderBase:
>
>
> source.reader.metric.num_records_in.override: false
>
>
> By default, the source reader will use the numRecordsIn metric in
> SourceReaderBase. If source reader want to report to metric by self, it can
> set source.reader.metric.num_records_in.override to true, which disables
> the registration of numRecordsIn in SourceReaderBase and let the actual
> implementation to report the metric instead.
>
>
> Any thoughts on this?
>
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-30234?jql=project%20%3D%20FLINK
>
>
> Best, Wencong Liu


Re: [DISCUSS] FLIP-280: Introduce a new explain mode to provide SQL advice

2023-01-02 Thread godfrey he
Thanks for driving this discussion.

Do we really need to expose `PlanAnalyzerFactory` as public interface?
I prefer we only expose ExplainDetail#ANALYZED_PHYSICAL_PLAN and the
analyzed result.
Which is enough for users and consistent with the results of `explain` method.

The classes about plan analyzer are in table planner module, which
does not public api
(public interfaces should be defined in flink-table-api-java module).
And PlanAnalyzer is depend on RelNode, which is internal class of
planner, and not expose to users.

Bests,
Godfrey


Shengkai Fang  于2023年1月3日周二 13:43写道:
>
> Sorry for the missing answer about the configuration of the Analyzer. Users
> may don't need to configure this with SQL statements. In the SQL Gateway,
> users can configure the endpoints with the option `sql-gateway.endpoint.type`
> in the flink-conf.
>
> Best,
> Shengkai
>
> Shengkai Fang  于2023年1月3日周二 12:26写道:
>
> > Hi, Jane.
> >
> > Thanks for bringing this to the discussion. I have some questions about
> > the FLIP:
> >
> > 1. `PlanAnalyzer#analyze` uses the FlinkRelNode as the input. Could you
> > share some thoughts about the motivation? In my experience, users mainly
> > care about 2 things when they develop their job:
> >
> > a. Why their SQL can not work? For example, their streaming SQL contains
> > an OVER window but their ORDER key is not ROWTIME. In this case, we may
> > don't have a physical node or logical node because, during the
> > optimization, the planner has already thrown the exception.
> >
> > b. Many users care about whether their state is compatible after upgrading
> > their Flink version. In this case, I think the old execplan and the SQL
> > statement are the user's input.
> >
> > So, I think we should introduce methods like `PlanAnalyzer#analyze(String
> > sql)` and `PlanAnalyzer#analyze(String sql, ExecnodeGraph)` here.
> >
> > 2. I am just curious how other people add the rules to the Advisor. When
> > rules increases, all these rules should be added to the Flink codebase?
> > 3. How do users configure another advisor?
> >
> > Best,
> > Shengkai
> >
> >
> >
> > Jane Chan  于2022年12月28日周三 12:30写道:
> >
> >> Hi @yuxia, Thank you for reviewing the FLIP and raising questions.
> >>
> >> 1: Is the PlanAnalyzerFactory also expected to be implemented by users
> >> just
> >> > like DynamicTableSourceFactory or other factories? If so, I notice that
> >> in
> >> > the code of PlanAnalyzerManager#registerAnalyzers, the code is as
> >> follows:
> >> > FactoryUtil.discoverFactory(classLoader, PlanAnalyzerFactory.class,
> >> > StreamPlanAnalyzerFactory.STREAM_IDENTIFIER)); IIUC, it'll always find
> >> the
> >> > factory with the name StreamPlanAnalyzerFactory.STREAM_IDENTIFIER; Is
> >> it a
> >> > typo or by design ?
> >>
> >>
> >> This is a really good open question. For the short answer, yes, it is by
> >> design. I'll explain the consideration in more detail.
> >>
> >> The standard procedure to create a custom table source/sink is to
> >> implement
> >> the factory and the source/sink class. There is a strong 1v1 relationship
> >> between the factory and the source/sink.
> >>
> >> SQL
> >>
> >> DynamicTableSourceFactory
> >>
> >> Source
> >>
> >> create table … with (‘connector’ = ‘foo’)
> >>
> >> #factoryIdentifer.equals(“foo”)
> >>
> >> FooTableSource
> >>
> >>
> >> *Apart from that, the custom function module is another kind of
> >> implementation. The factory creates a collection of functions. This is a
> >> 1vN relationship between the factory and the functions.*
> >>
> >> SQL
> >>
> >> ModuleFactory
> >>
> >> Function
> >>
> >> load module ‘bar’
> >>
> >> #factoryIdentifier.equals(“bar”)
> >>
> >> A collection of functions
> >>
> >> Back to the plan analyzers, if we choose the first style, we also need to
> >> expose a new SQL syntax to users, like "CREATE ANALYZER foo WITH ..." to
> >> specify the factory identifier. But I think it is too heavy because an
> >> analyzer is an auxiliary tool to help users write better queries, and thus
> >> it should be exposed at the API level other than the user syntax level.
> >>
> >> As a result, I propose to follow the second style. Then we don't need to
> >> introduce new syntax to create analyzers. Let StreamPlanAnalyzerFactory be
> >> the default factory to create analyzers under the streaming mode, and the
> >> custom analyzers will register themselves in StreamPlanAnalyzerFactory.
> >>
> >> @Override
> >> public List createAnalyzers() {
> >> return Arrays.asList(
> >> FooAnalyzer.INSTANCE,
> >> BarAnalyzer.INSTANCE,
> >> ...);
> >> }
> >>
> >>
> >> 2: Is there any special reason make PlanAdvice be a final class? Would it
> >> > be better to make it an interface and we provide a default
> >> implementation?
> >> > My concern is some users may want have their own implementation for
> >> > PlanAdvice. But it may be overthinking. If you think it won't bring any
> >> > problem, I'm also fine with that.
> >>
> >>
> >> The reason 

Re: [DISCUSS] Extending the feature freezing date of Flink 1.17

2023-01-02 Thread Yu Li
+1 for the proposal (extending the 1.17 feature freeze date to Jan 31st).

Best Regards,
Yu


On Tue, 3 Jan 2023 at 15:11, Zhu Zhu  wrote:

> +1 to extend the feature freeze date to Jan 31st.
>
> Thanks,
> Zhu
>
> David Anderson  于2023年1月3日周二 11:41写道:
> >
> > I'm also in favor of extending the feature freeze to Jan 31st.
> >
> > David
> >
> > On Thu, Dec 29, 2022 at 9:01 AM Leonard Xu  wrote:
> >
> > > Thanks Qingsheng for the proposal, the pandemic has really impacted
> > > development schedules.
> > >
> > > Jan 31st makes sense to me.
> > >
> > >
> > > Best,
> > > Leonard
> > >
> > >
>


Re: [DISCUSS] Adding a option for planner to decide which join reorder rule to choose

2023-01-02 Thread yh z
Hi Benchao,

Thanks for your reply.

Actually,  I mistakenly wrote the name "bushy join reorder" to "busy join
reorder". I'm sorry for the trouble bring to you. "Bushy join reorder"
means we can build a bushy join tree based on cost model, but now Flink can
only build a left-deep tree using Calcite LoptOptimizeJoinRule. I hope my
answers can help you solve the following questions:

For question #1: The biggest advantage of this "bushy join reorder"
strategy over the default Flink left-deep tree strategy is that it can
retail all possible join reorder plans, and then select the optimal plan
according to the cost model. This means that the busy join reorder strategy
can be better combined with the current cost model to get more reasonable
join reorder results. We verified it on the TPC-DS benchmark, with the
spark plan as a reference, the new busy join reorder strategy can make more
TPC-DS query plans be adjusted to be consistent with the Spark plan, and
the execution time is signifcantly reduced.  As for optimization
latency, this is the problem to be solved by the parameters to be
introduced in this discussion. When there are many tables need to be
reordered, the optimization latency will increase greatly. But when the
table numbers less than the threshold, the latency is the same as the
LoptOptimizeJoinRule.

For question #2: According to my research, many compute or database systems
have the "bushy join reorder" strategies based on dynamic programming. For
example, Spark and PostgresSql use the same strategy, and the threshold be
set to 12. Also, some papers, like [1] and [2], have also researched this
strategy, and [2] set the threshold to 14.

For question #3: The implementation of Calcite MultiJoinOptimizeBushyRule
is very simple, and it will not store the intermediate results at all. So,
the implementation of Calcite cannot get all possible join reorder results
and it cannot combine with the current cost model to get more reasonable
join reorder results.


[1]
https://courses.cs.duke.edu/compsci516/cps216/spring03/papers/selinger-etal-1979.pdf
[2] https://db.in.tum.de/~radke/papers/hugejoins.pdf



Benchao Li  于2023年1月3日周二 12:54写道:

> Hi Yunhong,
>
> Thanks for driving this~
>
> I haven't gone deep into the implementation details yet. Regarding the
> general description, I would ask a few questions firstly:
>
> #1, Is there any benchmark results about the optimization latency change
> compared to current approach? In OLAP scenario, query optimization latency
> is more crucial.
>
> #2, About the term "busy join reorder", is there any others systems which
> also use this term? I know Calcite has a rule[1] which uses the term "bushy
> join".
>
> #3, About the implementation, if this does the same work as Calcite
> MultiJoinOptimizeBushyRule, is it possible to use the Calcite version
> directly or extend it in some way?
>
> [1]
>
> https://github.com/apache/calcite/blob/9054682145727fbf8a13e3c79b3512be41574349/core/src/main/java/org/apache/calcite/rel/rules/MultiJoinOptimizeBushyRule.java#L78
>
> yh z  于2022年12月29日周四 14:44写道:
>
> > Hi, devs,
> >
> > I'd like to start a discuss about adding an option called
> > "table.oprimizer.busy-join-reorder-threshold" for planner rule while we
> try
> > to introduce a new busy join reorder rule[1] into Flink.
> >
> > This join reorder rule is based on dynamic programing[2], which can store
> > all possible intermediate results, and the cost model can be used to
> select
> > the optimal join reorder result. Compare with the existing Lopt join
> > reorder rule, the new rule can give more possible results and the result
> > can be more accurate. However, the search space of this rule will become
> > very large as the number of tables increases. So we should introduce an
> > option to limit the expansion of search space, if the number of table can
> > be reordered less than the threshold, the new busy join reorder rule is
> > used. On the contrary, the Lopt rule is used.
> >
> > The default threshold intended to be set to 12. One reason is that in the
> > tpc-ds benchmark test, when the number of tables exceeds 12, the
> > optimization time will be very long. The other reason is that it refers
> to
> > relevant engines, like Spark, whose recommended setting is 12.[3]
> >
> > Looking forward to your feedback.
> >
> > [1]  https://issues.apache.org/jira/browse/FLINK-30376
> > [2]
> >
> >
> https://courses.cs.duke.edu/compsci516/cps216/spring03/papers/selinger-etal-1979.pdf
> > [3]
> >
> >
> https://spark.apache.org/docs/3.3.1/configuration.html#runtime-sql-configuration
> >
> > Best regards,
> > Yunhong Zheng
> >
>
>
> --
>
> Best,
> Benchao Li
>


[jira] [Created] (FLINK-30545) The SchemaManager doesn't check 'NOT NULL' specification when committing AddColumn change

2023-01-02 Thread yuzelin (Jira)
yuzelin created FLINK-30545:
---

 Summary: The SchemaManager doesn't check 'NOT NULL' specification 
when committing AddColumn change
 Key: FLINK-30545
 URL: https://issues.apache.org/jira/browse/FLINK-30545
 Project: Flink
  Issue Type: Bug
  Components: Table Store
Reporter: yuzelin
 Fix For: table-store-0.4.0


Currently, table store doesn't support adding column with 'NOT NULL' 
specification, but it doesn't check this condition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Extending the feature freezing date of Flink 1.17

2023-01-02 Thread Zhu Zhu
+1 to extend the feature freeze date to Jan 31st.

Thanks,
Zhu

David Anderson  于2023年1月3日周二 11:41写道:
>
> I'm also in favor of extending the feature freeze to Jan 31st.
>
> David
>
> On Thu, Dec 29, 2022 at 9:01 AM Leonard Xu  wrote:
>
> > Thanks Qingsheng for the proposal, the pandemic has really impacted
> > development schedules.
> >
> > Jan 31st makes sense to me.
> >
> >
> > Best,
> > Leonard
> >
> >


Re: [DISCUSS] FLIP-280: Introduce a new explain mode to provide SQL advice

2023-01-02 Thread Shengkai Fang
Sorry for the missing answer about the configuration of the Analyzer. Users
may don't need to configure this with SQL statements. In the SQL Gateway,
users can configure the endpoints with the option `sql-gateway.endpoint.type`
in the flink-conf.

Best,
Shengkai

Shengkai Fang  于2023年1月3日周二 12:26写道:

> Hi, Jane.
>
> Thanks for bringing this to the discussion. I have some questions about
> the FLIP:
>
> 1. `PlanAnalyzer#analyze` uses the FlinkRelNode as the input. Could you
> share some thoughts about the motivation? In my experience, users mainly
> care about 2 things when they develop their job:
>
> a. Why their SQL can not work? For example, their streaming SQL contains
> an OVER window but their ORDER key is not ROWTIME. In this case, we may
> don't have a physical node or logical node because, during the
> optimization, the planner has already thrown the exception.
>
> b. Many users care about whether their state is compatible after upgrading
> their Flink version. In this case, I think the old execplan and the SQL
> statement are the user's input.
>
> So, I think we should introduce methods like `PlanAnalyzer#analyze(String
> sql)` and `PlanAnalyzer#analyze(String sql, ExecnodeGraph)` here.
>
> 2. I am just curious how other people add the rules to the Advisor. When
> rules increases, all these rules should be added to the Flink codebase?
> 3. How do users configure another advisor?
>
> Best,
> Shengkai
>
>
>
> Jane Chan  于2022年12月28日周三 12:30写道:
>
>> Hi @yuxia, Thank you for reviewing the FLIP and raising questions.
>>
>> 1: Is the PlanAnalyzerFactory also expected to be implemented by users
>> just
>> > like DynamicTableSourceFactory or other factories? If so, I notice that
>> in
>> > the code of PlanAnalyzerManager#registerAnalyzers, the code is as
>> follows:
>> > FactoryUtil.discoverFactory(classLoader, PlanAnalyzerFactory.class,
>> > StreamPlanAnalyzerFactory.STREAM_IDENTIFIER)); IIUC, it'll always find
>> the
>> > factory with the name StreamPlanAnalyzerFactory.STREAM_IDENTIFIER; Is
>> it a
>> > typo or by design ?
>>
>>
>> This is a really good open question. For the short answer, yes, it is by
>> design. I'll explain the consideration in more detail.
>>
>> The standard procedure to create a custom table source/sink is to
>> implement
>> the factory and the source/sink class. There is a strong 1v1 relationship
>> between the factory and the source/sink.
>>
>> SQL
>>
>> DynamicTableSourceFactory
>>
>> Source
>>
>> create table … with (‘connector’ = ‘foo’)
>>
>> #factoryIdentifer.equals(“foo”)
>>
>> FooTableSource
>>
>>
>> *Apart from that, the custom function module is another kind of
>> implementation. The factory creates a collection of functions. This is a
>> 1vN relationship between the factory and the functions.*
>>
>> SQL
>>
>> ModuleFactory
>>
>> Function
>>
>> load module ‘bar’
>>
>> #factoryIdentifier.equals(“bar”)
>>
>> A collection of functions
>>
>> Back to the plan analyzers, if we choose the first style, we also need to
>> expose a new SQL syntax to users, like "CREATE ANALYZER foo WITH ..." to
>> specify the factory identifier. But I think it is too heavy because an
>> analyzer is an auxiliary tool to help users write better queries, and thus
>> it should be exposed at the API level other than the user syntax level.
>>
>> As a result, I propose to follow the second style. Then we don't need to
>> introduce new syntax to create analyzers. Let StreamPlanAnalyzerFactory be
>> the default factory to create analyzers under the streaming mode, and the
>> custom analyzers will register themselves in StreamPlanAnalyzerFactory.
>>
>> @Override
>> public List createAnalyzers() {
>> return Arrays.asList(
>> FooAnalyzer.INSTANCE,
>> BarAnalyzer.INSTANCE,
>> ...);
>> }
>>
>>
>> 2: Is there any special reason make PlanAdvice be a final class? Would it
>> > be better to make it an interface and we provide a default
>> implementation?
>> > My concern is some users may want have their own implementation for
>> > PlanAdvice. But it may be overthinking. If you think it won't bring any
>> > problem, I'm also fine with that.
>>
>>
>> The reason why making PlanAdvice final is that I think users would prefer
>> to implement the custom PlanAnalyzer than PlanAdvice. PlanAdvice is a POJO
>> class to represent the analyzed result provided by PlanAnalyzer.
>>
>>
>> 3: Is there a way only show advice? For me, it seems the advice will be
>> > more useful and the nodes may contains to many details.
>>
>>
>> The result contains two parts: the optimized physical plan itself + the
>> analysis of the plan.
>>
>> For PlanAdvice with the scope as GLOBAL, it is possible to do so. While
>> for
>> a LOCAL scope, the advice content is specific to certain nodes (E.g., some
>> certain rel nodes are sensitive to state TTL configuration). In this
>> situation, the plan cannot be omitted. On the other hand, the plan is
>> necessary from the visualization perspective. During the PoC 

Re: [DISCUSS] Adding a option for planner to decide which join reorder rule to choose

2023-01-02 Thread Benchao Li
Hi Yunhong,

Thanks for driving this~

I haven't gone deep into the implementation details yet. Regarding the
general description, I would ask a few questions firstly:

#1, Is there any benchmark results about the optimization latency change
compared to current approach? In OLAP scenario, query optimization latency
is more crucial.

#2, About the term "busy join reorder", is there any others systems which
also use this term? I know Calcite has a rule[1] which uses the term "bushy
join".

#3, About the implementation, if this does the same work as Calcite
MultiJoinOptimizeBushyRule, is it possible to use the Calcite version
directly or extend it in some way?

[1]
https://github.com/apache/calcite/blob/9054682145727fbf8a13e3c79b3512be41574349/core/src/main/java/org/apache/calcite/rel/rules/MultiJoinOptimizeBushyRule.java#L78

yh z  于2022年12月29日周四 14:44写道:

> Hi, devs,
>
> I'd like to start a discuss about adding an option called
> "table.oprimizer.busy-join-reorder-threshold" for planner rule while we try
> to introduce a new busy join reorder rule[1] into Flink.
>
> This join reorder rule is based on dynamic programing[2], which can store
> all possible intermediate results, and the cost model can be used to select
> the optimal join reorder result. Compare with the existing Lopt join
> reorder rule, the new rule can give more possible results and the result
> can be more accurate. However, the search space of this rule will become
> very large as the number of tables increases. So we should introduce an
> option to limit the expansion of search space, if the number of table can
> be reordered less than the threshold, the new busy join reorder rule is
> used. On the contrary, the Lopt rule is used.
>
> The default threshold intended to be set to 12. One reason is that in the
> tpc-ds benchmark test, when the number of tables exceeds 12, the
> optimization time will be very long. The other reason is that it refers to
> relevant engines, like Spark, whose recommended setting is 12.[3]
>
> Looking forward to your feedback.
>
> [1]  https://issues.apache.org/jira/browse/FLINK-30376
> [2]
>
> https://courses.cs.duke.edu/compsci516/cps216/spring03/papers/selinger-etal-1979.pdf
> [3]
>
> https://spark.apache.org/docs/3.3.1/configuration.html#runtime-sql-configuration
>
> Best regards,
> Yunhong Zheng
>


-- 

Best,
Benchao Li


[jira] [Created] (FLINK-30544) Speed up finding minimum watermark across all channels by introducing heap-based algorithm

2023-01-02 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-30544:
--

 Summary: Speed up finding minimum watermark across all channels by 
introducing heap-based algorithm
 Key: FLINK-30544
 URL: https://issues.apache.org/jira/browse/FLINK-30544
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Reporter: Lijie Wang
 Fix For: 1.17.0


Currently, every time a task receives a watermark, it tries to update the 
minimum watermark.Currently, we use the traversal algorithm to find the minimum 
watermark across all channels(see 
[StatusWatermarkValue#findAndOutputNewMinWatermarkAcrossAlignedChannels|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java#:~:text=private%20void-,findAndOutputNewMinWatermarkAcrossAlignedChannels,-(DataOutput%3C%3F%3E]
 for details), and the time complexity is O(N), where N is the number of 
channels.

We can optimize it by introducing a heap-based algorthim, reducing the time 
complexity to O(log(N)))



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-280: Introduce a new explain mode to provide SQL advice

2023-01-02 Thread Shengkai Fang
Hi, Jane.

Thanks for bringing this to the discussion. I have some questions about the
FLIP:

1. `PlanAnalyzer#analyze` uses the FlinkRelNode as the input. Could you
share some thoughts about the motivation? In my experience, users mainly
care about 2 things when they develop their job:

a. Why their SQL can not work? For example, their streaming SQL contains an
OVER window but their ORDER key is not ROWTIME. In this case, we may don't
have a physical node or logical node because, during the optimization, the
planner has already thrown the exception.

b. Many users care about whether their state is compatible after upgrading
their Flink version. In this case, I think the old execplan and the SQL
statement are the user's input.

So, I think we should introduce methods like `PlanAnalyzer#analyze(String
sql)` and `PlanAnalyzer#analyze(String sql, ExecnodeGraph)` here.

2. I am just curious how other people add the rules to the Advisor. When
rules increases, all these rules should be added to the Flink codebase?
3. How do users configure another advisor?

Best,
Shengkai



Jane Chan  于2022年12月28日周三 12:30写道:

> Hi @yuxia, Thank you for reviewing the FLIP and raising questions.
>
> 1: Is the PlanAnalyzerFactory also expected to be implemented by users just
> > like DynamicTableSourceFactory or other factories? If so, I notice that
> in
> > the code of PlanAnalyzerManager#registerAnalyzers, the code is as
> follows:
> > FactoryUtil.discoverFactory(classLoader, PlanAnalyzerFactory.class,
> > StreamPlanAnalyzerFactory.STREAM_IDENTIFIER)); IIUC, it'll always find
> the
> > factory with the name StreamPlanAnalyzerFactory.STREAM_IDENTIFIER; Is it
> a
> > typo or by design ?
>
>
> This is a really good open question. For the short answer, yes, it is by
> design. I'll explain the consideration in more detail.
>
> The standard procedure to create a custom table source/sink is to implement
> the factory and the source/sink class. There is a strong 1v1 relationship
> between the factory and the source/sink.
>
> SQL
>
> DynamicTableSourceFactory
>
> Source
>
> create table … with (‘connector’ = ‘foo’)
>
> #factoryIdentifer.equals(“foo”)
>
> FooTableSource
>
>
> *Apart from that, the custom function module is another kind of
> implementation. The factory creates a collection of functions. This is a
> 1vN relationship between the factory and the functions.*
>
> SQL
>
> ModuleFactory
>
> Function
>
> load module ‘bar’
>
> #factoryIdentifier.equals(“bar”)
>
> A collection of functions
>
> Back to the plan analyzers, if we choose the first style, we also need to
> expose a new SQL syntax to users, like "CREATE ANALYZER foo WITH ..." to
> specify the factory identifier. But I think it is too heavy because an
> analyzer is an auxiliary tool to help users write better queries, and thus
> it should be exposed at the API level other than the user syntax level.
>
> As a result, I propose to follow the second style. Then we don't need to
> introduce new syntax to create analyzers. Let StreamPlanAnalyzerFactory be
> the default factory to create analyzers under the streaming mode, and the
> custom analyzers will register themselves in StreamPlanAnalyzerFactory.
>
> @Override
> public List createAnalyzers() {
> return Arrays.asList(
> FooAnalyzer.INSTANCE,
> BarAnalyzer.INSTANCE,
> ...);
> }
>
>
> 2: Is there any special reason make PlanAdvice be a final class? Would it
> > be better to make it an interface and we provide a default
> implementation?
> > My concern is some users may want have their own implementation for
> > PlanAdvice. But it may be overthinking. If you think it won't bring any
> > problem, I'm also fine with that.
>
>
> The reason why making PlanAdvice final is that I think users would prefer
> to implement the custom PlanAnalyzer than PlanAdvice. PlanAdvice is a POJO
> class to represent the analyzed result provided by PlanAnalyzer.
>
>
> 3: Is there a way only show advice? For me, it seems the advice will be
> > more useful and the nodes may contains to many details.
>
>
> The result contains two parts: the optimized physical plan itself + the
> analysis of the plan.
>
> For PlanAdvice with the scope as GLOBAL, it is possible to do so. While for
> a LOCAL scope, the advice content is specific to certain nodes (E.g., some
> certain rel nodes are sensitive to state TTL configuration). In this
> situation, the plan cannot be omitted. On the other hand, the plan is
> necessary from the visualization perspective. During the PoC phase, I made
> some attempts to adapt the Flink Visualizer to illustrate the analyzed
> plan, and it looks like the following pic. I think this is intuitive to
> help users understand their queries and what they can do according to the
> advice.
>
>
>
> 4: I'm curious about what't the global advice will look like. Is it
> > possible to provide an example?
>
>
> Here is an example to illustrate the non-deterministic update issue.
>
> create temporary table 

[jira] [Created] (FLINK-30543) Adding more examples for setting up jobs via operator.

2023-01-02 Thread Sriram Ganesh (Jira)
Sriram Ganesh created FLINK-30543:
-

 Summary: Adding more examples for setting up jobs via operator.
 Key: FLINK-30543
 URL: https://issues.apache.org/jira/browse/FLINK-30543
 Project: Flink
  Issue Type: Improvement
Reporter: Sriram Ganesh


Currently, we have only basic examples which help to see how to run the job via 
an operator if we can add more examples for all upgrade modes that would be 
more helpful.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-282: Introduce Delete & Update API

2023-01-02 Thread Jingsong Li
Thanks yuxia for the FLIP! It looks really good!

I have three comments:

## RowLevelDeleteMode

Can RowLevelDeleteMode be a higher level?
`SupportsRowLevelDelete.RowLevelDeleteMode` is better than
`SupportsRowLevelDelete.RowLevelDeleteInfo.RowLevelDeleteMode`.
Same as `RowLevelUpdateMode`.

## Scope of addContextParameter

I see that some of your comments are for sink, but can you make it
clearer here? What exactly is its scope? For example, is it possible
to see all the other sources and sinks in a topo? What is the order of
seeing?

## Scope of getScanPurpose

Will all sources see this method? Will there be compatibility issues?
If sources ignore this method, will this cause strange phenomena?

What I mean is: should another SupportsXX be created here to provide
delete and update.

Best,
Jingsong

On Thu, Dec 29, 2022 at 6:23 PM yuxia  wrote:
>
> Hi, Lincoln Lee;
> 1: Yes,  it's a typo; Thanks for pointing out. I have fixed the typo.
> 2: For stream users,  assuming for delete, they will receive 
> TableException("DELETE TABLE is not supported for streaming mode now"); 
> Update is similar. I also update them to the FLIP.
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Lincoln Lee" 
> 收件人: "dev" 
> 发送时间: 星期三, 2022年 12 月 28日 上午 9:50:50
> 主题: Re: [DISCUSS] FLIP-282: Introduce Delete & Update API
>
> Hi yuxia,
>
> Thanks for the proposal! I think it'll be very useful for users in batch
> scenarios to cooperate with external systems.
>
> For the flip I have two questions:
> 1. Is it a typo the default method 'default ScanPurpose getScanPurpose();'
> without implementation in interface ScanContext?
> 2. For stream users, what exceptions will be received for this unsupported
> operations?
>
> Best,
> Lincoln Lee
>
>
> yuxia  于2022年12月26日周一 20:24写道:
>
> > Hi, devs.
> >
> > I'd like to start a discussion about FLIP-282: Introduce Delete & Update
> > API[1].
> >
> > Row-Level SQL Delete & Update are becoming more and more important in
> > modern big data workflow. The use cases include deleting a set of rows for
> > regulatory compliance, updating a set of rows for data correction, etc.
> > So, in this FLIP, I want to introduce Delete & Update API to Flink in
> > batch mode. With these interfaces, the external connectors will have
> > ability to delete & update existing data in the corresponding storages.
> >
> > Looking forwards to your feedback.
> >
> > [1]:
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235838061
> >
> >
> > Best regards,
> > Yuxia
> >
> >


[DISCUSS] Allow source readers extending SourceReaderBase to override numRecordsIn report logic

2023-01-02 Thread Wencong Liu
Hi devs,  


I'd like to start a discussion about FLINK-30234: SourceReaderBase should 
provide an option to disable numRecordsIn metric registration [1].


As the FLINK-302345 describes, the numRecordsIn metric is pre-registered for 
all sources in SourceReaderBase currently. Considering different implementation 
of source reader, the definition of "record" might differ from the one we use 
in SourceReaderBase, hence numRecordsIn might be inaccurate.


We could introduce an public option in SourceReaderOptions used in 
SourceReaderBase:


source.reader.metric.num_records_in.override: false


By default, the source reader will use the numRecordsIn metric in 
SourceReaderBase. If source reader want to report to metric by self, it can set 
source.reader.metric.num_records_in.override to true, which disables the 
registration of numRecordsIn in SourceReaderBase and let the actual 
implementation to report the metric instead.


Any thoughts on this?


[1]  https://issues.apache.org/jira/browse/FLINK-30234?jql=project%20%3D%20FLINK


Best, Wencong Liu

Re: [EXTERNAL] Re: [DISCUSS] Add Flink Web3 Connector

2023-01-02 Thread Junyao Huang
Hi Martin,

Thanks for the replies.
But actually, Web3 represents all kinds of blockchain node.
Like https://github.com/ethereum/go-ethereum, which will release Ethereum 2.0 
this year.

[cid:e1dcf0ae-2dde-49f2-9dbb-8d23f428a27a]

Will cover sharding & indexing like ElasticSearch & Kafka.

Bitcoin, Ethereum, Solana solutions are called Web3 in this era.

So I think it is still valuable.

Regards,
Junyao



From: Martijn Visser 
Sent: Tuesday, December 20, 2022 16:31
To: dev@flink.apache.org ; Junyao Huang 

Subject: [EXTERNAL] Re: [DISCUSS] Add Flink Web3 Connector

[You don't often get email from martijnvis...@apache.org. Learn why this is 
important at https://aka.ms/LearnAboutSenderIdentification ]

Hi Junyao,

I don't see value in this for the Apache Flink community. You're stating
this as a 'Web3 connector' but are only referring to one specific project.
If that project needs a Flink connector, I think it should be built and
maintained by that project, not by the Flink community.

Best regards,

Martijn

On Tue, Dec 20, 2022 at 6:11 AM Junyao Huang
 wrote:

> Web3 is very hot. But you could search GitHub open source blockchain
> explorer, the most stars project is blockscout,
> https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fblockscout%2Fblockscout=05%7C01%7CJunyao.Huang%40microsoft.com%7Cfe740f3d1eb04aa0a48d08dae264a1e6%7C72f988bf86f141af91ab2d7cd011db47%7C0%7C0%7C638071219129406668%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=ojY7UrNTY3nxxP7ZNcvXwZpsTMvUExGU%2BIPKm8uRU6w%3D=0<
> https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fblockscout%2Fblockscout=05%7C01%7CJunyao.Huang%40microsoft.com%7Cfe740f3d1eb04aa0a48d08dae264a1e6%7C72f988bf86f141af91ab2d7cd011db47%7C0%7C0%7C638071219129406668%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=ojY7UrNTY3nxxP7ZNcvXwZpsTMvUExGU%2BIPKm8uRU6w%3D=0,>
>  which use Elixir as a parallel
> engine to sync block from blockchain node into a file(CSV format). I think
> Flink is the best solution of ingestion. Reason:
>
> (1)blockchain needs to match different chain, including Ethereum, Bitcoin,
> Solana, etc. through JSON RPC.
>
> (2)Like EtherScan, the blockchain needs to fetch the latest block into
> storage for the index to search.
>
> (3)Also as a supplement to (2), we need a connector to fully sync all
> block from Blockchain Node. I think Flink Stream/Batch alignment feature is
> suit for this scenarios.
>
> (4)According to FLIP-27, we could use block number as SourceSplit to read.
> It is very natural.
>
> (5)Flink Community could use web3 topic to get PR effects on web3 cycle.
>
>
> [1]
> https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fprojects%2FFLINK%2Fissues%2FFLINK-30445%3Ffilter%3Dallissues=05%7C01%7CJunyao.Huang%40microsoft.com%7Cfe740f3d1eb04aa0a48d08dae264a1e6%7C72f988bf86f141af91ab2d7cd011db47%7C0%7C0%7C638071219129406668%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=LDTKqvQGlYfv%2B60OZT0mFM%2Bf2Oc1Fv8ZufVqcl4HLi4%3D=0
>
> [2]
> https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FFLINK%2FFLIP-27%253A%2BRefactor%2BSource%2BInterface=05%7C01%7CJunyao.Huang%40microsoft.com%7Cfe740f3d1eb04aa0a48d08dae264a1e6%7C72f988bf86f141af91ab2d7cd011db47%7C0%7C0%7C638071219129406668%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=GX%2FfCClAq1lhaPtz0W4SxdS%2BH86%2F%2Bea7Xfg%2FSyJCIJM%3D=0
>
>


Re: [DISCUSS] Extending the feature freezing date of Flink 1.17

2023-01-02 Thread David Anderson
I'm also in favor of extending the feature freeze to Jan 31st.

David

On Thu, Dec 29, 2022 at 9:01 AM Leonard Xu  wrote:

> Thanks Qingsheng for the proposal, the pandemic has really impacted
> development schedules.
>
> Jan 31st makes sense to me.
>
>
> Best,
> Leonard
>
>


Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

2023-01-02 Thread Yuxin Tan
Hi all,

Thanks for all the feedback so far.

The discussion has been going on for some time. If there are no
more new comments, we will start a vote today.

Best,
Yuxin


Yuxin Tan  于2022年12月29日周四 17:37写道:

> Hi, everyone
>
> Thanks for the reply and the discussion.
>
> We discussed this with @Guowei Ma, @Dong Lin, and @Yanfei Lei
> offline, and reached a consensus on this FLIP. Based on the offline
> discussions and suggestions from @Weihua Hu, the following changes
> have been updated in the FLIP.
>
> 1. Changes in public interfaces.
> - Updated the descriptions of the newly added config to describe the
> option more clearly.
> - The new config will be marked as experimental in the first release,
> and we will revisit this in the next release based on the user feedback.
> - In the long run, with the new config, we think the original two configs
> can be deprecated. At this stage, since the new config is still
> experimental,
> we will not immediately deprecate them.
> - Modify the config key name as
> taskmanager.memory.network.read-buffer.required-per-gate.max for
> more clarity.
> 2. Modify the floating buffer calculation method.
> - When the memory used reaches the threshold, the number of exclusive
> buffers is gradually reduced in a fine-grained manner, rather than
> directly
> reducing the number of exclusive buffers to 0.
>
> Best,
> Yuxin
>
>
> Yuxin Tan  于2022年12月29日周四 14:48写道:
>
>> Hi, Roman
>>
>> Sorry about that I missed one question just now.
>>
>> >  if the two configuration options are still in use, why does the FLIP
>> propose to deprecate them?
>> These two configs are usually used to avoid the memory issue, but
>> after introducing the improvement, generally, I think it is no longer
>> necessary to adjust these two configurations to avoid the issue. So
>> I propose to deprecate them in the future when the @Experimental
>> annotation of the newly added config is removed.
>>
>> Best,
>> Yuxin
>>
>>
>> Roman Khachatryan  于2022年12月28日周三 20:10写道:
>>
>>> Thanks for your reply Yuxin,
>>>
>>> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
>>> > configurations, which are not calculated. I have described them in the
>>> FLIP
>>> > motivation section.
>>>
>>> The motivation section says about floating buffers:
>>> > FloatingBuffersPerGate is within the range of
>>> [numFloatingBufferThreashold, ExclusiveBuffersPerChannel * numChannels +
>>> DefaultFloatingBuffersPerGate] ...
>>> So my question is what value exactly in this range will it have and how
>>> and
>>> where will it be computed?
>>>
>>> As for the ExclusiveBuffersPerChannel, there was a proposal in the thread
>>> to calculate it dynamically (by linear search
>>> from taskmanager.network.memory.buffers-per-channel down to 0).
>>>
>>> Also, if the two configuration options are still in use, why does the
>>> FLIP
>>> propose to deprecate them?
>>>
>>> Besides that, wouldn't it be more clear to separate motivation from the
>>> proposed changes?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Wed, Dec 28, 2022 at 12:19 PM JasonLee <17610775...@163.com> wrote:
>>>
>>> > Hi Yuxin
>>> >
>>> >
>>> > Thanks for the proposal, big + 1 for this FLIP.
>>> >
>>> >
>>> >
>>> > It is difficult for users to calculate the size of network memory. If
>>> the
>>> > setting is too small, the task cannot be started. If the setting is too
>>> > large, there may be a waste of resources. As far as possible, Flink
>>> > framework can automatically set a reasonable value, but I have a small
>>> > problem. network memory is not only related to the parallelism of the
>>> task,
>>> > but also to the complexity of the task DAG. The more complex a DAG is,
>>> > shuffle write and shuffle read require larger buffers. How can we
>>> determine
>>> > how many RS and IG a DAG has?
>>> >
>>> >
>>> >
>>> > Best
>>> > JasonLee
>>> >
>>> >
>>> >  Replied Message 
>>> > | From | Yuxin Tan |
>>> > | Date | 12/28/2022 18:29 |
>>> > | To |  |
>>> > | Subject | Re: [DISCUSS] FLIP-266: Simplify network memory
>>> configurations
>>> > for TaskManager |
>>> > Hi, Roman
>>> >
>>> > Thanks for the replay.
>>> >
>>> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
>>> > configurations, which are not calculated. I have described them in the
>>> FLIP
>>> > motivation section.
>>> >
>>> > 3. Each gate requires at least one buffer...
>>> > The timeout exception occurs when the ExclusiveBuffersPerChannel
>>> > can not be requested from NetworkBufferPool, which is not caused by the
>>> > change of this Flip. In addition, if  we have set the
>>> > ExclusiveBuffersPerChannel
>>> > to 0 when using floating buffers, which can also decrease the
>>> probability
>>> > of
>>> > this exception.
>>> >
>>> > 4. It would be great to have experimental results for jobs with
>>> different
>>> > exchange types.
>>> > Thanks for the suggestion. I have a test about different exchange
>>> types,
>>> > forward
>>> > and rescale, and the results show no 

Re: [DISCUSS] FLIP-283: Use adaptive batch scheduler as default scheduler for batch jobs

2023-01-02 Thread Lijie Wang
Hi Junrui,

Thanks for driving this FLIP, + 1 for this proposal. I believe it will
greatly improve the experiences of batch users.

Best,
Lijie

Zhu Zhu  于2022年12月30日周五 12:40写道:

> Hi Junrui,
>
> Thanks for creating this FLIP!
>
> AdaptiveBatchScheduler is more powerful than DefaultScheduler in batch
> scheduling, also with some must-have features like speculative execution.
> It will be great that users can easily use it, without required to knowing
> the underlying scheduler and configuring some advanced items.
>
> So generally +1 for this proposal.
>
> Regarding the configuration key renaming, like yuxia mentioned, we should
> deprecate the old ones and add new ones with new names, to guarantee
> compatibility.
>
> Thanks,
> Zhu
>
> yuxia  于2022年12月30日周五 11:10写道:
> >
> > Hi, JunRui Lee.
> >
> > Thanks for driving this FLIP. It must a good improvement for batch
> users' experiences.
> > I have few questions about this FLIP:
> > 1: About the configuration renaming. The old configurations will be
> deprecated or removed directly? if user upgrade their Flink version, these
> old configuration will still be considered or just ignored?  If ignore, the
> users may need to modify their configurations after they upgrade their
> Flink.
> >
> > 2: I'm cursion in which case users will disable auto parallelism
> derivation if they have enabled adaptive batch scheduler.  IIUC, auto
> parallelism derivation is what adaptive batch scheduler aim to do. If use
> want to diable auto parallelism derivation, can they just disable adaptive
> batch scheduler.?
> >
> > Best regards,
> > Yuxia
> >
> > - 原始邮件 -
> > 发件人: "JunRui Lee" 
> > 收件人: "dev" 
> > 发送时间: 星期四, 2022年 12 月 29日 下午 7:45:36
> > 主题: [DISCUSS] FLIP-283: Use adaptive batch scheduler as default
> scheduler for batch jobs
> >
> > Hi, devs,
> >
> > I'd like to start a discussion about FLIP-283: Use adaptive batch
> > scheduler as default scheduler for batch jobs[1].
> >
> > In FLIP-187, we introduced an adaptive batch scheduler. The adaptive
> > batch scheduler has stronger batch scheduling capabilities, including
> > automatically deciding parallelisms of job vertices for batch
> > jobs (FLIP-187)[2], data balanced distribution (FLINK-29663)[3],
> > and speculative execution (FLIP-168)[4]. To further use the adaptive
> > batch scheduler to improve flink's batch capability, in this FLIP
> > we aim to make the adaptive batch scheduler as the default batch
> > scheduler.
> >
> > Currently, users have to set some configuration of the adaptive
> > batch scheduler, which is not very convenient. To use the adaptive
> > batch scheduler as the default batch scheduler, we need to improve
> > the user's out-of-the-box experience. Therefore,  we also need to
> > optimize the current adaptive batch scheduler configuration.
> >
> > Looking forward to your feedback.
> >
> > [1]:
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-283%3A+Use+adaptive+batch+scheduler+as+default+scheduler+for+batch+jobs
> > [2]:
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Scheduler
> > [3]:https://issues.apache.org/jira/browse/FLINK-29663
> > [4]:
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> >
> > Best regards,
> > JunRui Lee
>


Re: [DISCUSS] FLIP-274 : Introduce metric group for OperatorCoordinator

2023-01-02 Thread Hang Ruan
Hi, Jark and Dong,

Thanks for your comments. Sorry for my late reply.

For suggestion 1, I plan to implement the SplitEnumeratorMetricGroup in
another issue, and it is not contained in this FLIP. I will add some
description about this part.
For suggestion 2, changes about OperatorCoordinator#metricGroup has already
been documented in the proposed change section.

Best,
Hang

Dong Lin  于2023年1月1日周日 09:45写道:

> Let me chime-in and add comments regarding the public interface section.
> Please see my comments inline.
>
> On Thu, Dec 29, 2022 at 6:08 PM Jark Wu  wrote:
>
> > Hi Hang,
> >
> > Thanks for driving this discussion. I think this is a very useful feature
> > for connectors.
> >
> > The FLIP looks quite good to me, and I just have two suggestions.
> >
> > 1. In the "Public Interface" section, mention that the implementation
> > behavior of "SplitEnumeratorContext#metricGroup" is changed from
> returning
> > null to returning a concrete SplitEnumeratorMetricGroup instance. Even
> > though the API is already there, the behavior change can also be
> considered
> > a public change.
> >
>
> SplitEnumeratorContext#metricGroup is an interface and this FLIP does not
> seem to change its semantics/behavior. The FLIP does change the
> implementation/behavior of SourceCoordinatorContext#metricGroup, which is
> marked @Internal.
>
> Thus it might seem a bit weird to add in the public interface section
> saying that we change the interface SplitEnumeratorContext#metricGroup from
> returning null to non-null object.
>
> 2. Mention the newly added interface of "OperatorCoordinator#metricGroup"
> > in the "Proposed Changes" section or "Public Interface" section. As the
> > FLIP said, OperatorCoordinator is widely used in many connectors. Though
> it
> > is still an @Internal API, I think it is worth mentioning the change in
> the
> > FLIP.
>
>
> Since OperatorCoordinator is an internal API, it seems reasonable to
> explain it in the proposed change section. The FLIP seems to have
> documented this in point 5 of the proposed change section.
>
> BTW, if we think there are @internal classes that are important enough to
> be added in the public interface section, it might be useful to explicitly
> discuss this topic and document it in the "*What are the "public
> interfaces" of the project*" in this
> <
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> >
> wiki.
>
>
> > Best,
> > Jark
> >
> >
> > On Mon, 26 Dec 2022 at 18:06, Hang Ruan  wrote:
> >
> > > Hi, thanks for the feedback, Zhu Zhu and Qingsheng.
> > > After combining everyone's comments, the main concerns and
> corresponding
> > > adjustments are as follows.
> > >
> > > Q1: Common metrics are not quite useful.
> > > numEventsIn and numEventsOut counters will be removed from the
> > > OperatorCoordinatorMetricGroup. These common metrics do not provide
> > enough
> > > information for users. The users are more willing to get the number of
> > > events of the specified type instead of the total number. And this
> metric
> > > is calculated differently. The implementation could register the metric
> > by
> > > themselves.
> > >
> > > Q2: This FLIP is overly complicated.
> > > This FLIP will become concise after these modifications.
> > > OperatorCoordinatorMetricGroup has already been introduced into Flink
> by
> > > FLIP-179<
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-179%3A+Expose+Standardized+Operator+Metrics
> > > >.
> > > And
> > > this FLIP will not change it. This FLIP only provides a new metric
> option
> > > and a new metric group scope. The changes in proposed changes provide
> the
> > > details about the modifications for the internal classes, which might
> > make
> > > it look complicated.
> > >
> > > Thanks for all the comments again. If there are no further comments, we
> > > plan to start the voting thread this week.
> > >
> > > Best,
> > > Hang
> > >
> > > Qingsheng Ren  于2022年12月26日周一 16:48写道:
> > >
> > > > Thanks for the FLIP, Hang!
> > > >
> > > > This FLIP overall looks good to me. Actually I share the same concern
> > > with
> > > > Zhu that numEventsIn and numEventsOut counters are not quite useful
> to
> > > end
> > > > users. OperatorEvent is a quite low-level abstraction, which requires
> > > > instantialization in order to be practical to users and developers,
> so
> > > > maybe it's better to exclude them from the FLIP.
> > > >
> > > > Best,
> > > > Qingsheng
> > > > Ververica (Alibaba)
> > > >
> > > > On Mon, Dec 26, 2022 at 12:08 PM Zhu Zhu  wrote:
> > > >
> > > > > Hi Hang,
> > > > > I still see no strong reason why we need numEventsIn/numEventsOut
> > > > metrics.
> > > > > In the discussion in FLINK-29801, I can see the same concern from
> > > others.
> > > > > So I prefer to exclude them from this FLIP to avoid over-extending
> > the
> > > > > scope.
> > > > >
> > > > > Thanks,
> > > > > Zhu
> > > > >
> > > > > Hang Ruan  于2022年12月23日周五 15:21写道:
> > > > > >
> > > > > > Hi 

[jira] [Created] (FLINK-30542) Support adaptive hash aggregate in runtime

2023-01-02 Thread Yunhong Zheng (Jira)
Yunhong Zheng created FLINK-30542:
-

 Summary: Support adaptive hash aggregate in runtime
 Key: FLINK-30542
 URL: https://issues.apache.org/jira/browse/FLINK-30542
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Affects Versions: 1.17.0
Reporter: Yunhong Zheng
 Fix For: 1.17.0


Introduce a new strategy to adaptively determine whether local hash aggregate 
is required according to the aggregation degree of local hash aggregate.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Flink Forward Session Question

2023-01-02 Thread Márton Balassi
Hi Rion,

Unlike the previous Flink Forwards to the best of my knowledge the latest
edition was not uploaded to YouTube. It might make sense to reach out to
the authors directly.

On Sat, Dec 31, 2022 at 5:35 PM Rion Williams  wrote:

> Hey Flinkers,
>
> Firstly, early Happy New Year’s to everyone in the community. I’ve been
> digging a bit into exactly-once processing with Flink and Pinot and I came
> across this session from Flink Foward last year:
>
> -
> https://www.slideshare.net/FlinkForward/exactlyonce-financial-data-processing-at-scale-with-flink-and-pinot
>
> I was curious if anyone knew if this session was recording as the deck
> itself seemed to have quite a bit of value. I figured the mailing list
> would be a reasonable place to ask.
>
> Thanks in advance,
>
> Rion
>


[jira] [Created] (FLINK-30541) Add Transformer and Estimator for OnlineStandardScaler

2023-01-02 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-30541:
-

 Summary: Add Transformer and Estimator for OnlineStandardScaler
 Key: FLINK-30541
 URL: https://issues.apache.org/jira/browse/FLINK-30541
 Project: Flink
  Issue Type: New Feature
  Components: Library / Machine Learning
Affects Versions: ml-2.2.0
Reporter: Zhipeng Zhang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30540) DataSinkTaskTest failed with a 143 exit code

2023-01-02 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-30540:
-

 Summary: DataSinkTaskTest failed with a 143 exit code
 Key: FLINK-30540
 URL: https://issues.apache.org/jira/browse/FLINK-30540
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task, Tests
Affects Versions: 1.17.0
Reporter: Matthias Pohl


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44366=logs=a549b384-c55a-52c0-c451-00e0477ab6db=eef5922c-08d9-5ba3-7299-8393476594e7=8480

We experienced a 143 exit code when finalizing {{DataSinkTaskTest}}:
{code}

Jan 01 00:58:47 [ERROR] at 
org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
Jan 01 00:58:47 [ERROR] at 
org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:120)
Jan 01 00:58:47 [ERROR] at 
org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:355)
Jan 01 00:58:47 [ERROR] at 
org.apache.maven.DefaultMaven.execute(DefaultMaven.java:155)
Jan 01 00:58:47 [ERROR] at 
org.apache.maven.cli.MavenCli.execute(MavenCli.java:584)
Jan 01 00:58:47 [ERROR] at 
org.apache.maven.cli.MavenCli.doMain(MavenCli.java:216)
Jan 01 00:58:47 [ERROR] at org.apache.maven.cli.MavenCli.main(MavenCli.java:160)
Jan 01 00:58:47 [ERROR] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
Jan 01 00:58:47 [ERROR] at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
Jan 01 00:58:47 [ERROR] at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
Jan 01 00:58:47 [ERROR] at java.lang.reflect.Method.invoke(Method.java:498)
Jan 01 00:58:47 [ERROR] at 
org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289)
Jan 01 00:58:47 [ERROR] at 
org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229)
Jan 01 00:58:47 [ERROR] at 
org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415)
Jan 01 00:58:47 [ERROR] at 
org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356)
Jan 01 00:58:47 [ERROR] Caused by: 
org.apache.maven.surefire.booter.SurefireBooterForkException: The forked VM 
terminated without properly saying goodbye. VM crash or System.exit called?
Jan 01 00:58:47 [ERROR] Command was /bin/sh -c cd /__w/3/s/flink-runtime && 
/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -XX:+UseG1GC -Xms256m -Xmx768m 
-jar 
/__w/3/s/flink-runtime/target/surefire/surefirebooter7079560860955914030.jar 
/__w/3/s/flink-runtime/target/surefire 2023-01-01T00-54-17_721-jvmRun4 
surefire118217553075734742tmp surefire_1430697542098749596tmp
Jan 01 00:58:47 [ERROR] Error occurred in starting fork, check output in log
Jan 01 00:58:47 [ERROR] Process Exit Code: 134
Jan 01 00:58:47 [ERROR] Crashed tests:
Jan 01 00:58:47 [ERROR] org.apache.flink.runtime.operators.DataSinkTaskTest
Jan 01 00:58:47 [ERROR] at 
org.apache.maven.plugin.surefire.booterclient.ForkStarter.fork(ForkStarter.java:748)
Jan 01 00:58:47 [ERROR] at 
org.apache.maven.plugin.surefire.booterclient.ForkStarter.access$700(ForkStarter.java:121)
Jan 01 00:58:47 [ERROR] at 
org.apache.maven.plugin.surefire.booterclient.ForkStarter$1.call(ForkStarter.java:393)
Jan 01 00:58:47 [ERROR] at 
org.apache.maven.plugin.surefire.booterclient.ForkStarter$1.call(ForkStarter.java:370)
Jan 01 00:58:47 [ERROR] at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)
Jan 01 00:58:47 [ERROR] at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
Jan 01 00:58:47 [ERROR] at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Jan 01 00:58:47 [ERROR] at java.lang.Thread.run(Thread.java:748)
Jan 01 00:58:47 [ERROR] -> [Help 1]
Jan 01 00:58:47 [ERROR] 
Jan 01 00:58:47 [ERROR] To see the full stack trace of the errors, re-run Maven 
with the -e switch.
Jan 01 00:58:47 [ERROR] Re-run Maven using the -X switch to enable full debug 
logging.
Jan 01 00:58:47 [ERROR] 
Jan 01 00:58:47 [ERROR] For more information about the errors and possible 
solutions, please read the following articles:
Jan 01 00:58:47 [ERROR] [Help 1] 
http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30539) YARNSessionCapacitySchedulerITCase.testDetachedPerJobYarnCluster and testDetachedPerJobYarnClusterWithStreamingJob timing out

2023-01-02 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-30539:
-

 Summary: 
YARNSessionCapacitySchedulerITCase.testDetachedPerJobYarnCluster and 
testDetachedPerJobYarnClusterWithStreamingJob timing out
 Key: FLINK-30539
 URL: https://issues.apache.org/jira/browse/FLINK-30539
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.16.0
Reporter: Matthias Pohl


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44337=logs=298e20ef-7951-5965-0e79-ea664ddc435e=d4c90338-c843-57b0-3232-10ae74f00347=32023

Both tests failed because they are running into the 60s timeout that was 
defined for each of them. We should get rid of the timeout to access the thread 
dump. It might be related to FLINK-24169



--
This message was sent by Atlassian Jira
(v8.20.10#820010)