Re: [DISCUSS] Introduce Hash Lookup Join

2021-12-28 Thread Jing Zhang
Hi, Lincoln
Thanks a lot for the feedback.

>  Regarding the hint name ‘USE_HASH’, could we consider more candidates?
Things are a little different from RDBMS in the distributed world, and we
also aim to solve the data skew problem, so all these incoming hints names
should be considered together.

About skew problem, I would discuss this in next FLIP individually. I would
like to share hint proposal for skew here.
I want to introduce 'skew' hint which is a query hint, similar with skew
hint in spark [1] and MaxCompute[2].
The 'skew' hint could only contain the name of the table with skew.
Besides, skew hint could accept table name and column names.
In addition, skew hint could accept table name, column names and skew
values.
For example:

SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders') */ o.order_id,
o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;

The 'skew' hint is not only used for look up join here, but also could be
used for other types of join later, for example, batch hash join or
streaming regular join.
Go back to better name problem for hash look up join. Since the 'skew' hint
is a separate hint, so 'use_hash' is still an alternative.
WDYT?
I don't have a good idea about the better hint name yet. I would like to
heard more suggestions about hint names.

>  As you mentioned in the flip, this solution depends on future changes to
calcite (and also upgrading calcite would be another possible big change:
at least calicite-1.30 vs 1.26, are we preparing to accept this big
change?).

Indeed, solution 1 depends on calcite upgrade.
I admit upgrade from Calcite 1.26 to 1.30 would be a big change. I still
remember what we have suffered from last upgrade to Calcite 1.26.
However we could not always avoid upgrade for the following reason:
1. Other features also depends on the Calcite upgrade. For example, Session
Window and Count Window.
2. If we always avoid Calcite upgrade, there would be more gap with the
latest version. One day, if upgrading becomes a thing which has to be done,
the pain is more.

WDYT?

>  Is there another possible way to minimize the change in calcite?

Do you check the 'Other Alternatives' part in the FLIP-204? It gives
another solution which does not depend on calcite upgrade and do not need
to worry about the hint would be missed in the propagation.
This is also what we have done in the internal version.
The core idea is propagating 'use_hash' hint to TableScan with matched
table names.  However, it is a little hacky.

> As I know there're more limitations than `Correlate`.

As mentioned before, in our external version, I choose the the 'Other
Alternatives' part in the FLIP-204.
Although I do a POC in the solution 1 and lists all changes I found in the
FLIP, there may still be something I missed.
I'm very happy to hear that you point out there're more limitations except
for `Correlate`, would you please give more details on this part?

Best,
Jing Zhang

[1] https://docs.databricks.com/delta/join-performance/skew-join.html
[2]
https://help.aliyun.com/apsara/enterprise/v_3_13_0_20201215/odps/enterprise-ascm-user-guide/hotspot-tilt.html?spm=a2c4g.14484438.10001.669

Jing Zhang  于2021年12月29日周三 14:40写道:

> Hi Yuan and Lincoln,
> thanks a lot for the attention. I would answer the email one by one.
>
> To Yuan
> > How shall we deal with CDC data? If there is CDC data in the pipeline,
> IMHO, shuffle by join key will cause CDC data disorder. Will it be better
> to use primary key in this case?
>
> Good question.
> The problem could not only exists in CDC data source, but also exists when
> the input stream is not insert-only stream (for example, the result of
> unbounded aggregate or regular join).
> I think use hash by primary key is not a good choise. It could not raise
> the cache hit because cache key is look up key instead of primary key of
> input.
>
> To avoid wrong result, hash lookup Join requires that the input stream
> should be insert_only stream or its upsert keys contains lookup keys.
>
> I've added this limitation to FLIP, thanks a lot for reminding.
>
> > If the shuffle keys can be customized  when users have the knowledge
> about distribution of data?
>
> I'm not sure I understand your question.
>
> Do you mean to support user defined partitioner function on keys just like
> flink DataStream sql?
> If yes, I'm afraid there is no plan to support this feature yet because
> the feature involves many things, for example:
> 1. sql syntax
> 2. user defined partitioner API
> 3. RelDistribution type extension and Flink RelDistribution extension
> 4. FlinkExpandConversionRule
> 5. Exchange execNode extension
> 6. 
> It needs well designed and more discussion. If this is a strong
> requirement, we would drive another discussion on this point individually.
> In this FLIP, I would first support hash shuffle. WDYT?
>
> Or do you mean support hash by other keys instead of lookup key?
> If yes, would 

[jira] [Created] (FLINK-25474) Idea Scala plugin can not compile RexExplainUtil

2021-12-28 Thread godfrey he (Jira)
godfrey he created FLINK-25474:
--

 Summary: Idea Scala plugin can not compile RexExplainUtil 
 Key: FLINK-25474
 URL: https://issues.apache.org/jira/browse/FLINK-25474
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: godfrey he
 Fix For: 1.15.0
 Attachments: image-2021-12-29-15-16-50-513.png, 
image-2021-12-29-15-20-18-794.png

Idea version: 2021.2.3
Scala version: 2.11.12

There are some errors in {{RexExplainUtil}} and many classes which use the 
methods in {{RexExplainUtil}}. 
NOTES: those class can be compiled and executed successfully

 !image-2021-12-29-15-16-50-513.png! 
 !image-2021-12-29-15-20-18-794.png! 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [VOTE] Apache Flink ML Release 2.0.0, release candidate #2

2021-12-28 Thread Dong Lin
+1 (non-binding)

- Verified that the checksums and GPG files match the corresponding release
files
- Verified that the source distributions do not contain any binaries
- Built the source distribution with Maven to ensure all source files have
Apache headers
- Verified that all POM files point to the same version
- Checked that the README.md file does not have anything unexpected
- Checked JIRA release notes
- Checked source code tag "release-2.0.0-rc2"




On Tue, Dec 28, 2021 at 8:58 PM Yun Gao 
wrote:

> Hi everyone,
> Please review and vote on the release candidate #2 for the version 2.0.0
> of Apache Flink ML,
> as follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
> **Testing Guideline**
> You can find here [1] a page in the project wiki on instructions for
> testing.
> To cast a vote, it is not necessary to perform all listed checks, but
> please
> mention which checks you have performed when voting.
> **Release Overview**
> As an overview, the release consists of the following:
> a) Flink ML source release to be deployed to dist.apache.org
> b) Flink ML Python source distributions to be deployed to PyPI
> c) Maven artifacts to be deployed to the Maven Central Repository
> **Staging Areas to Review**
> The staging areas containing the above mentioned artifacts are as follows,
> for your review:
> * All artifacts for a) and b) can be found in the corresponding dev
> repository at dist.apache.org [2], which are signed with the key with
> fingerprint CBE82BEFD827B08AFA843977EDBF922A7BC84897 [3]
> * All artifacts for c) can be found at the Apache Nexus Repository [4]
> Other links for your review:
> * JIRA release notes [5]
> * Source code tag "release-2.0.0-rc2" [6]
> * PR to update the website Downloads page to include Flink ML links [7]
> **Vote Duration**
> The voting time will run for at least 72 hours.
> It is adopted by majority approval, with at least 3 PMC affirmative votes.
> Thanks,
> Dong and Yun
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+ML+Release
> [2] https://dist.apache.org/repos/dist/dev/flink/flink-ml-2.0.0-rc2/
> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> [4] https://repository.apache.org/content/repositories/orgapacheflink-1475
> [5]
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351079
> [6] https://github.com/apache/flink-ml/releases/tag/release-2.0.0-rc2
> [7] https://github.com/apache/flink-web/pull/493
>


Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-12-28 Thread Jingsong Li
Thanks Till for your suggestions.

Personally, I like flink-warehouse, this is what we want to convey to
the user, but it indicates a bit too much scope.

How about just calling it flink-store?
Simply to convey an impression: this is flink's store project,
providing a built-in store for the flink compute engine, which can be
used by flink-table as well as flink-datastream.

Best,
Jingsong

On Tue, Dec 28, 2021 at 5:15 PM Till Rohrmann  wrote:
>
> Hi Jingsong,
>
> I think that developing flink-dynamic-storage as a separate sub project is
> a very good idea since it allows us to move a lot faster and decouple
> releases from Flink. Hence big +1.
>
> Do we want to name it flink-dynamic-storage or shall we use a more
> descriptive name? dynamic-storage sounds a bit generic to me and I wouldn't
> know that this has something to do with letting Flink manage your tables
> and their storage. I don't have a very good idea but maybe we can call it
> flink-managed-tables, flink-warehouse, flink-olap or so.
>
> Cheers,
> Till
>
> On Tue, Dec 28, 2021 at 9:49 AM Martijn Visser 
> wrote:
>
> > Hi Jingsong,
> >
> > That sounds promising! +1 from my side to continue development under
> > flink-dynamic-storage as a Flink subproject. I think having a more in-depth
> > interface will benefit everyone.
> >
> > Best regards,
> >
> > Martijn
> >
> > On Tue, 28 Dec 2021 at 04:23, Jingsong Li  wrote:
> >
> >> Hi all,
> >>
> >> After some experimentation, we felt no problem putting the dynamic
> >> storage outside of flink, and it also allowed us to design the
> >> interface in more depth.
> >>
> >> What do you think? If there is no problem, I am asking for PMC's help
> >> here: we want to propose flink-dynamic-storage as a flink subproject,
> >> and we want to build the project under apache.
> >>
> >> Best,
> >> Jingsong
> >>
> >>
> >> On Wed, Nov 24, 2021 at 8:10 PM Jingsong Li 
> >> wrote:
> >> >
> >> > Hi Stephan,
> >> >
> >> > Thanks for your reply.
> >> >
> >> > Data never expires automatically.
> >> >
> >> > If there is a need for data retention, the user can choose one of the
> >> > following options:
> >> > - In the SQL for querying the managed table, users filter the data by
> >> themselves
> >> > - Define the time partition, and users can delete the expired
> >> > partition by themselves. (DROP PARTITION ...)
> >> > - In the future version, we will support the "DELETE FROM" statement,
> >> > users can delete the expired data according to the conditions.
> >> >
> >> > So to answer your question:
> >> >
> >> > > Will the VMQ send retractions so that the data will be removed from
> >> the table (via compactions)?
> >> >
> >> > The current implementation is not sending retraction, which I think
> >> > theoretically should be sent, currently the user can filter by
> >> > subsequent conditions.
> >> > And yes, the subscriber would not see strictly a correct result. I
> >> > think this is something we can improve for Flink SQL.
> >> >
> >> > > Do we want time retention semantics handled by the compaction?
> >> >
> >> > Currently, no, Data never expires automatically.
> >> >
> >> > > Do we want to declare those types of queries "out of scope" initially?
> >> >
> >> > I think we want users to be able to use three options above to
> >> > accomplish their requirements.
> >> >
> >> > I will update FLIP to make the definition clearer and more explicit.
> >> >
> >> > Best,
> >> > Jingsong
> >> >
> >> > On Wed, Nov 24, 2021 at 5:01 AM Stephan Ewen 
> >> wrote:
> >> > >
> >> > > Thanks for digging into this.
> >> > > Regarding this query:
> >> > >
> >> > > INSERT INTO the_table
> >> > >   SELECT window_end, COUNT(*)
> >> > > FROM (TUMBLE(TABLE interactions, DESCRIPTOR(ts), INTERVAL '5'
> >> MINUTES))
> >> > > GROUP BY window_end
> >> > >   HAVING now() - window_end <= INTERVAL '14' DAYS;
> >> > >
> >> > > I am not sure I understand what the conclusion is on the data
> >> retention question, where the continuous streaming SQL query has retention
> >> semantics. I think we would need to answer the following questions (I will
> >> call the query that computed the managed table the "view materializer
> >> query" - VMQ).
> >> > >
> >> > > (1) I guess the VMQ will send no updates for windows beyond the
> >> "retention period" is over (14 days), as you said. That makes sense.
> >> > >
> >> > > (2) Will the VMQ send retractions so that the data will be removed
> >> from the table (via compactions)?
> >> > >   - if yes, this seems semantically better for users, but it will be
> >> expensive to keep the timers for retractions.
> >> > >   - if not, we can still solve this by adding filters to queries
> >> against the managed table, as long as these queries are in Flink.
> >> > >   - any subscriber to the changelog stream would not see strictly a
> >> correct result if we are not doing the retractions
> >> > >
> >> > > (3) Do we want time retention semantics handled by the compaction?
> >> > >   - if we say that we lazily apply the deletes in 

Re: [DISCUSS] Introduce Hash Lookup Join

2021-12-28 Thread Jing Zhang
Hi Yuan and Lincoln,
thanks a lot for the attention. I would answer the email one by one.

To Yuan
> How shall we deal with CDC data? If there is CDC data in the pipeline,
IMHO, shuffle by join key will cause CDC data disorder. Will it be better
to use primary key in this case?

Good question.
The problem could not only exists in CDC data source, but also exists when
the input stream is not insert-only stream (for example, the result of
unbounded aggregate or regular join).
I think use hash by primary key is not a good choise. It could not raise
the cache hit because cache key is look up key instead of primary key of
input.

To avoid wrong result, hash lookup Join requires that the input stream
should be insert_only stream or its upsert keys contains lookup keys.

I've added this limitation to FLIP, thanks a lot for reminding.

> If the shuffle keys can be customized  when users have the knowledge
about distribution of data?

I'm not sure I understand your question.

Do you mean to support user defined partitioner function on keys just like
flink DataStream sql?
If yes, I'm afraid there is no plan to support this feature yet because the
feature involves many things, for example:
1. sql syntax
2. user defined partitioner API
3. RelDistribution type extension and Flink RelDistribution extension
4. FlinkExpandConversionRule
5. Exchange execNode extension
6. 
It needs well designed and more discussion. If this is a strong
requirement, we would drive another discussion on this point individually.
In this FLIP, I would first support hash shuffle. WDYT?

Or do you mean support hash by other keys instead of lookup key?
If yes, would you please tell me a specific user case?
We need to fetch the record from external storage of dimension table by
look up key, so those dimension table source uses look up keys as cache
key.
We could only increase  the cache ratio by shuffle lookup keys.
I need more use cases to understand this requirement.

> Some connectors such as hive, caches all data in LookupFunction. How to
decrease the valid cache data size if data can be shuffled?

Very good idea.
There are two types of cache.
For Key-Value storage, such as Redis/HBase, the lookup table source stores
the visited lookup keys and it's record into cache lazily.
For other storage without keys, such as hive, each task loads all data into
cache eagerly in the initialize phase.
After introduce hash partitioner, for key-value storages, there is no need
to change; for hive, each task could only load part of cache instead of
load all cache.

We have implemented this optimization in our internal version.
The core idea is push the partitioner information down to the lookup table
source. When loading data into caches, each task could only store those
records which look keys are sent to current task.
We called this 'HashPartitionedCache'.

I have added this point into the Lookup Join requirements list in the
motivation of the FLIP, but I would not do this point in this FLIP right
now.
If this is a strong requirement, we need drive another discussion on this
topic individually because this point involves many extension on API.

Best,
Jing Zhang


Lincoln Lee  于2021年12月29日周三 10:01写道:

> Hi Jing,
> Thanks for bringing up this discussion!  Agree that this join hints
> should benefit both bounded and unbounded cases as Martin mentioned.
> I also agree that implementing the query hint is the right way for a more
> general purpose since the dynamic table options has a limited scope.
>Some points I'd like to share are:
> 1. Regarding the hint name ‘USE_HASH’, could we consider more candidates?
> Things are a little different from RDBMS in the distributed world, and we
> also aim to solve the data skew problem, so all these incoming hints names
> should be considered together.
> 2. As you mentioned in the flip, this solution depends on future changes to
> calcite (and also upgrading calcite would be another possible big change:
> at least calicite-1.30 vs 1.26, are we preparing to accept this big
> change?). Is there another possible way to minimize the change in calcite?
> As I know there're more limitations than `Correlate`.
>
> Best,
> Lincoln Lee
>
>
> Jing Zhang  于2021年12月28日周二 23:04写道:
>
> > Hi Martijn,
> > Thanks a lot for your attention.
> > I'm sorry I didn't explain the motivation clearly. I would like to
> explain
> > it in detail, and then give response on your questions.
> > A lookup join is typically used to enrich a table with data that is
> queried
> > from an external system. Many Lookup table sources introduce cache in
> order
> > to reduce the RPC call, such as JDBC, CSV, HBase connectors.
> > For those connectors, we could raise cache hit ratio by routing the same
> > lookup keys to the same task instance. This is the purpose of
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
> > .
> > Other cases might benefit from Hash distribution, such as batch hash join
> > as 

[jira] [Created] (FLINK-25473) Azure pipeline failed due to stopped hearing from agent Azure Pipelines 11

2021-12-28 Thread Yun Gao (Jira)
Yun Gao created FLINK-25473:
---

 Summary: Azure pipeline failed due to stopped hearing from agent 
Azure Pipelines 11
 Key: FLINK-25473
 URL: https://issues.apache.org/jira/browse/FLINK-25473
 Project: Flink
  Issue Type: Bug
  Components: Build System / Azure Pipelines
Affects Versions: 1.15.0
Reporter: Yun Gao


{code:java}
##[error]We stopped hearing from agent Azure Pipelines 11. Verify the agent 
machine is running and has a healthy network connection. Anything that 
terminates an agent process, starves it for CPU, or blocks its network access 
can cause this error. For more information, see: 
https://go.microsoft.com/fwlink/?linkid=846610
,##[error]
Agent: Azure Pipelines 11
Started: Today at 下午12:44
Duration: 1h 41m 59s {code}
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=28678=logs=a9db68b9-a7e0-54b6-0f98-010e0aff39e2



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25472) Update to Log4j 2.17.1

2021-12-28 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-25472:
--

 Summary: Update to Log4j 2.17.1
 Key: FLINK-25472
 URL: https://issues.apache.org/jira/browse/FLINK-25472
 Project: Flink
  Issue Type: Technical Debt
Reporter: Martijn Visser
Assignee: Martijn Visser
 Fix For: 1.15.0, 1.13.6, 1.14.3


We should update from Log4j 2.17.0 to 2.17.1 to address CVE-2021-44832: Apache 
Log4j2 vulnerable to RCE via JDBC Appender when attacker controls configuration.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25471) wrong result if table toDataStream then keyey by sum

2021-12-28 Thread zhangzh (Jira)
zhangzh created FLINK-25471:
---

 Summary: wrong result if table toDataStream then keyey by sum
 Key: FLINK-25471
 URL: https://issues.apache.org/jira/browse/FLINK-25471
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.14.2
Reporter: zhangzh


I have 6 lines like this:

Row.of("Alice"),
Row.of("alice"),
Row.of("Bob"),
Row.of("lily"),
Row.of("lily"),
Row.of("lily")

then  make it to  table  with one colums "wrod"

 

 

 

 

 

 

 




import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.functions.\{MapFunction, ReduceFunction}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row

object TableToDataStreamBatchWordCount {

def main(args: Array[String]) {


//create env and tableEnv
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
//env.setRuntimeMode(RuntimeExecutionMode.BATCH)
env.setParallelism(7)
val tableEnv = StreamTableEnvironment.create(env)

// make data ,3 line
val resultDS2 = env.fromElements(
Row.of("Alice"),
Row.of("alice"),
Row.of("Bob"),
Row.of("lily"),
Row.of("lily"),
Row.of("lily")
)(Types.ROW(Types.STRING))

// dataStream[Row] --> Table --> sql to upper transform table
val table = tableEnv.fromDataStream(resultDS2).as("word")
tableEnv.createTemporaryView(s"tmp_table",table)
val resultTable = tableEnv.sqlQuery(s" select UPPER(word) as word from 
tmp_table ")

// sql transformed table --> DataStream[String]
val resultDs = tableEnv.toDataStream(resultTable).map(row => {
row.getField("word").asInstanceOf[String]
})


// keyby reduce
val counts: DataStream[(String, Int)] = resultDs
.map((_, 1))
.keyBy(_._1)
.sum(1)

// print result
counts.print()

env.execute("WordCount")
}
}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25470) Add/Expose/differentiate metrics of checkpoint size between changelog size vs materialization size

2021-12-28 Thread Yuan Mei (Jira)
Yuan Mei created FLINK-25470:


 Summary: Add/Expose/differentiate metrics of checkpoint size 
between changelog size vs materialization size
 Key: FLINK-25470
 URL: https://issues.apache.org/jira/browse/FLINK-25470
 Project: Flink
  Issue Type: Sub-task
Reporter: Yuan Mei






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-203: Incremental savepoints

2021-12-28 Thread Yu Li
Thanks for the proposal Piotr! Overall I'm +1 for the idea, and below are
my two cents:

1. How about adding a "Term Definition" section and clarify what "native
format" (the "native" data persistence format of the current state backend)
and "canonical format" (the "uniform" format that supports switching state
backends) means?

2. IIUC, currently the FLIP proposes to only support incremental savepoint
with native format, and there's no plan to add such support for canonical
format, right? If so, how about writing this down explicitly in the FLIP
doc, maybe in a "Limitations" section, plus the fact that
`HashMapStateBackend` cannot support incremental savepoint before FLIP-151
is done? (side note: @Roman just a kindly reminder, that please take
FLIP-203 into account when implementing FLIP-151)

3. How about changing the description of "the default configuration of the
checkpoints will be used to determine whether the savepoint should be
incremental or not" to something like "the `state.backend.incremental`
setting now denotes the type of native format snapshot and will take effect
for both checkpoint and savepoint (with native type)", to prevent concept
confusion between checkpoint and savepoint?

4. How about putting the notes of behavior change (the default type of
savepoint will be changed to `native` in the future, and by then the taken
savepoint cannot be used to switch state backends by default) to a more
obvious place, for example moving from the "CLI" section to the
"Compatibility" section? (although it will only happen in 1.16 release
based on the proposed plan)

And all above suggestions apply for our user-facing document after the FLIP
is (partially or completely, accordingly) done, if taken (smile).

Best Regards,
Yu


On Tue, 21 Dec 2021 at 22:23, Seth Wiesman  wrote:

> >> AFAIK state schema evolution should work both for native and canonical
> >> savepoints.
>
> Schema evolution does technically work for both formats, it happens after
> the code paths have been unified, but the community has up until this point
> considered that an unsupported feature. From my perspective making this
> supported could be as simple as adding test coverage but that's an active
> decision we'd need to make.
>
> On Tue, Dec 21, 2021 at 7:43 AM Piotr Nowojski 
> wrote:
>
> > Hi Konstantin,
> >
> > > In this context: will the native format support state schema evolution?
> > If
> > > not, I am not sure, we can let the format default to native.
> >
> > AFAIK state schema evolution should work both for native and canonical
> > savepoints.
> >
> > Regarding what is/will be supported we will document as part of this
> > FLIP-203. But it's not as simple as just the difference between native
> and
> > canonical formats.
> >
> > Best, Piotrek
> >
> > pon., 20 gru 2021 o 14:28 Konstantin Knauf 
> napisał(a):
> >
> > > Hi Piotr,
> > >
> > > Thanks a lot for starting the discussion. Big +1.
> > >
> > > In my understanding, this FLIP introduces the snapshot format as a
> > *really*
> > > user facing concept. IMO it is important that we document
> > >
> > > a) that it is not longer the checkpoint/savepoint characteristics that
> > > determines the kind of changes that a snapshots allows (user code,
> state
> > > schema evolution, topology changes), but now this becomes a property of
> > the
> > > format regardless of whether this is a snapshots or a checkpoint
> > > b) the exact changes that each format allows (code, state schema,
> > topology,
> > > state backend, max parallelism)
> > >
> > > In this context: will the native format support state schema evolution?
> > If
> > > not, I am not sure, we can let the format default to native.
> > >
> > > Thanks,
> > >
> > > Konstantin
> > >
> > >
> > > On Mon, Dec 20, 2021 at 2:09 PM Piotr Nowojski 
> > > wrote:
> > >
> > > > Hi devs,
> > > >
> > > > I would like to start a discussion about a previously announced
> follow
> > up
> > > > of the FLIP-193 [1], namely allowing savepoints to be in native
> format
> > > and
> > > > incremental. The changes do not seem invasive. The full proposal is
> > > > written down as FLIP-203: Incremental savepoints [2]. Please take a
> > look,
> > > > and let me know what you think.
> > > >
> > > > Best,
> > > > Piotrek
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-193%3A+Snapshots+ownership
> > > > [2]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-203%3A+Incremental+savepoints#FLIP203:Incrementalsavepoints-Semantic
> > > >
> > >
> > >
> > > --
> > >
> > > Konstantin Knauf
> > >
> > > https://twitter.com/snntrable
> > >
> > > https://github.com/knaufk
> > >
> >
>


[DISCUSS] FLIP-206: Support PyFlink Runtime Execution in Thread Mode

2021-12-28 Thread Xingbo Huang
Hi everyone,

I would like to start a discussion thread on "Support PyFlink Runtime
Execution in Thread Mode"

We have provided PyFlink Runtime framework to support Python user-defined
functions since Flink 1.10. The PyFlink Runtime framework is called Process
Mode, which depends on an inter-process communication architecture based on
the Apache Beam Portability framework. Although starting a dedicated
process to execute Python user-defined functions could have better resource
isolation, it will bring greater resource and performance overhead.

In order to overcome the resource and performance problems on Process Mode,
we will propose a new execution mode which executes Python user-defined
functions in the same thread instead of a separate process.

I have drafted the FLIP-206[1]. Please feel free to reply to this email
thread. Looking forward to your feedback!

Best,
Xingbo

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode


Re: [DISCUSS] Introduce Hash Lookup Join

2021-12-28 Thread Lincoln Lee
Hi Jing,
Thanks for bringing up this discussion!  Agree that this join hints
should benefit both bounded and unbounded cases as Martin mentioned.
I also agree that implementing the query hint is the right way for a more
general purpose since the dynamic table options has a limited scope.
   Some points I'd like to share are:
1. Regarding the hint name ‘USE_HASH’, could we consider more candidates?
Things are a little different from RDBMS in the distributed world, and we
also aim to solve the data skew problem, so all these incoming hints names
should be considered together.
2. As you mentioned in the flip, this solution depends on future changes to
calcite (and also upgrading calcite would be another possible big change:
at least calicite-1.30 vs 1.26, are we preparing to accept this big
change?). Is there another possible way to minimize the change in calcite?
As I know there're more limitations than `Correlate`.

Best,
Lincoln Lee


Jing Zhang  于2021年12月28日周二 23:04写道:

> Hi Martijn,
> Thanks a lot for your attention.
> I'm sorry I didn't explain the motivation clearly. I would like to explain
> it in detail, and then give response on your questions.
> A lookup join is typically used to enrich a table with data that is queried
> from an external system. Many Lookup table sources introduce cache in order
> to reduce the RPC call, such as JDBC, CSV, HBase connectors.
> For those connectors, we could raise cache hit ratio by routing the same
> lookup keys to the same task instance. This is the purpose of
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
> .
> Other cases might benefit from Hash distribution, such as batch hash join
> as you mentioned. It is a cool idea, however it is not the purpose of this
> FLIP, we could discuss this in FLINK-20670
> .
>
> > - When I was reading about this topic [1] I was wondering if this feature
> would be more beneficial for bounded use cases and not so much for
> unbounded use cases. What do you think?
>
> As mentioned before, the purpose of Hash Lookup Join is to increase the
> cache hit ratio which is different from Oracle Hash Join. However we could
> use the similar hint syntax.
>
> > - If I look at the current documentation for SQL Hints in Flink [2], I
> notice that all of the hints there are located at the end of the SQL
> statement. In the FLIP, the use_hash is defined directly after the 'SELECT'
> keyword. Can we somehow make this consistent for the user? Or should the
> user be able to specify hints anywhere in its SQL statement?
>
> Calcite supports hints in two locations [3]:
> Query Hint: right after the SELECT keyword;
> Table Hint: right after the referenced table name.
> Now Flink has supported dynamic table options based on the Hint framework
> of Calcite which is mentioned in doc[2].
> Besides, query hints are also important, it could give a hint for
> optimizers to choose a better plan. Almost all popular databases and
> big-data engines support sql query hints, such as oracle, hive, spark and
> so on.
> I think using query hints in this case is more natural for users, WDYT?
>
> I have updated the motivation part in the FLIP,
> Thanks for the feedback!
>
> [1] https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI
> [2]
>
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/
> [3] https://calcite.apache.org/docs/reference.html#sql-hints
>
> Best,
> Jing Zhang
>
> Martijn Visser  于2021年12月28日周二 22:02写道:
>
> > Hi Jing,
> >
> > Thanks a lot for the explanation and the FLIP. I definitely learned
> > something when reading more about `use_hash`. My interpretation would be
> > that the primary benefit of a hash lookup join would be improved
> > performance by allowing the user to explicitly optimise the planner.
> >
> > I have a couple of questions:
> >
> > - When I was reading about this topic [1] I was wondering if this feature
> > would be more beneficial for bounded use cases and not so much for
> > unbounded use cases. What do you think?
> > - If I look at the current documentation for SQL Hints in Flink [2], I
> > notice that all of the hints there are located at the end of the SQL
> > statement. In the FLIP, the use_hash is defined directly after the
> 'SELECT'
> > keyword. Can we somehow make this consistent for the user? Or should the
> > user be able to specify hints anywhere in its SQL statement?
> >
> > Best regards,
> >
> > Martijn
> >
> > [1] https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI
> > [2]
> >
> >
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/
> >
> >
> > On Tue, 28 Dec 2021 at 08:17, Jing Zhang  wrote:
> >
> > > Hi everyone,
> > > Look up join
> > > <
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
> > > >[1]
> > > is
> > > commonly used feature in Flink SQL. We have 

Re: [DISCUSS] Introduce Hash Lookup Join

2021-12-28 Thread zst...@163.com
Hi Jing,


Thanks very much for your FLIP. I have some points:


- How shall we deal with CDC data? If there is CDC data in the pipeline, IMHO, 
shuffle by join key will cause CDC data disorder. Will it be better to use 
primary key in this case?


- If the shuffle keys can be customized  when users have the knowledge about 
distribution of data?


- Some connectors such as hive, caches all data in LookupFunction. How to 
decrease the valid cache data size if data can be shuffled?


Best regards,


Yuan
On 12/28/2021 15:11,Jing Zhang wrote:
Hi everyone,
Look up join
[1]
is
commonly used feature in Flink SQL. We have received many optimization
requirements on look up join. For example:
1. Enforces left side of lookup join do a hash partitioner to raise cache
hint ratio
2. Solves the data skew problem after introduces hash lookup join
3. Enables mini-batch optimization to reduce RPC call

Next we will solve these problems one by one. Firstly,  we would focus on
point 1, and continue to discuss point 2 and point 3 later.

There are many similar requirements from user mail list and JIRA about hash
Lookup Join, for example:
1. FLINK-23687  -
Introduce partitioned lookup join to enforce input of LookupJoin to hash
shuffle by lookup keys
2. FLINK-25396  -
lookupjoin source table for pre-partitioning
3. FLINK-25262  -
Support to send data to lookup table for KeyGroupStreamPartitioner way for
SQL.

In this FLIP, I would like to start a discussion about Hash Lookup Join.
The core idea is introducing a 'USE_HASH' hint in query.  This syntax is
directly user-oriented and therefore requires careful design.
There are two ways about how to propagate this hint to LookupJoin in
optimizer. We need further discussion to do final decide. Anyway, the
difference between the two solution is only about the internal
implementation and has no impact on the user.

For more detail on the proposal:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join


Looking forward to your feedback, thanks.

Best,
Jing Zhang

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join


Re: [DISCUSS] FLIP-200: Support Multiple Rule and Dynamic Rule Changing (Flink CEP)

2021-12-28 Thread Nicholas Jiang
Hi David,

Thanks for your feedback of the FLIP. I addressed the comments above and share 
the thoughts about the question mentioned:

*About how this will work with the OperatorCoordinator for re-processing of the 
historical data using the OperatorCoordinator?*

OperatorCoordinator will checkpoint the full amount of PatternProcessor data. 
For the reprocessing of historical data, you can read the PatternProcessor 
snapshots saved by this checkpoint from a certain historical checkpoint, and 
then recreate the historical data through these PatternProcessor snapshots.

About the side-input (custom source / operator + broadcast), Becket has given 
the explanation for the OperatorCoordinator V.S. side-input / broadcast stream. 
You could share your thoughts about this.

Best,
Nicholas Jiang

On 2021/12/21 08:24:53 David Morávek wrote:
> Hi Yunfeng,
> 
> thanks for drafting this FLIP, this will be a great addition into the CEP
> toolbox!
> 
> Apart from running user code in JM, which want to avoid in general, I'd
> have one more another concern about using the OperatorCoordinator and that
> is re-processing of the historical data. Any thoughts about how this will
> work with the OC?
> 
> I have a slight feeling that a side-input (custom source / operator +
> broadcast) would a better fit for this case. This would simplify the
> consistency concerns (watermarks + pushback) and the re-processing of
> historical data.
> 
> Best,
> D.
> 
> 
> On Tue, Dec 21, 2021 at 6:47 AM Nicholas Jiang 
> wrote:
> 
> > Hi Konstantin, Martijn
> >
> > Thanks for the detailed feedback in the discussion. What I still have left
> > to answer/reply to:
> >
> > -- Martijn: Just to be sure, this indeed would mean that if for whatever
> > reason the heartbeat timeout, it would crash the job, right?
> >
> > IMO, if for whatever reason the heartbeat timeout, it couldn't check the
> > PatternProcessor consistency between the OperatorCoordinator and the
> > subtasks so that the job would be crashed.
> >
> > -- Konstantin: What I was concerned about is that we basically let users
> > run a UserFunction in the OperatorCoordinator, which it does not seem to
> > have been designed for.
> >
> > In general, we have reached an agreement on the design of this FLIP, but
> > there are some concerns on the OperatorCoordinator, about whether basically
> > let users run a UserFunction in the OperatorCoordinator is designed for
> > OperatorCoordinator. We would like to invite Becket Qin who is the author
> > of OperatorCoordinator to help us to answer this concern.
> >
> > Best,
> > Nicholas Jiang
> >
> >
> > On 2021/12/20 10:07:14 Martijn Visser wrote:
> > > Hi all,
> > >
> > > Really like the discussion on this topic moving forward. I really think
> > > this feature will be much appreciated by the Flink users. What I still
> > have
> > > left to answer/reply to:
> > >
> > > -- Good point. If for whatever reason the different taskmanagers can't
> > get
> > > the latest rule, the Operator Coordinator could send a heartbeat to all
> > > taskmanagers with the latest rules and check the heartbeat response from
> > > all the taskmanagers whether the latest rules of the taskmanager is equal
> > > to these of the Operator Coordinator.
> > >
> > > Just to be sure, this indeed would mean that if for whatever reason the
> > > heartbeat timeout, it would crash the job, right?
> > >
> > > -- We have consided about the solution mentioned above. In this
> > solution, I
> > > have some questions about how to guarantee the consistency of the rule
> > > between each TaskManager. By having a coodinator in the JobManager to
> > > centrally manage the latest rules, the latest rules of all TaskManagers
> > are
> > > consistent with those of the JobManager, so as to avoid the
> > inconsistencies
> > > that may be encountered in the above solution. Can you introduce how this
> > > solution guarantees the consistency of the rules?
> > >
> > > The consistency that we could guarantee was based on how often each
> > > TaskManager would do a refresh and how often we would accept a refresh to
> > > fail. We set the refresh time to a relatively short one (30 seconds) and
> > > maximum failures to 3. That meant that we could guarantee that rules
> > would
> > > be updated in < 2 minutes or else the job would crash. That was
> > sufficient
> > > for our use cases. This also really depends on how big your cluster is. I
> > > can imagine that if you have a large scale cluster that you want to run,
> > > you don't want to DDOS the backend system where you have your rules
> > stored.
> > >
> > > -- In summary, the current design is that JobManager tells all
> > TaskManagers
> > > the latest rules through OperatorCoodinator, and will initiate a
> > heartbeat
> > > to check whether the latest rules on each TaskManager are consistent. We
> > > will describe how to deal with the Failover scenario in more detail on
> > FLIP.
> > >
> > > Thanks for that. I think having the JobManager tell the 

[jira] [Created] (FLINK-25469) FlinkKafkaProducerITCase.testScaleUpAfterScalingDown fails on AZP

2021-12-28 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-25469:
-

 Summary: FlinkKafkaProducerITCase.testScaleUpAfterScalingDown 
fails on AZP
 Key: FLINK-25469
 URL: https://issues.apache.org/jira/browse/FLINK-25469
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.15.0
Reporter: Till Rohrmann
 Fix For: 1.15.0


The test {{FlinkKafkaProducerITCase.testScaleUpAfterScalingDown}} fails with

{code}
Dec 28 15:21:42 [ERROR] 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testScaleUpAfterScalingDown
  Time elapsed: 138.55 s  <<< FAILURE!
Dec 28 15:21:42 java.lang.AssertionError: Expected elements: <[0, 1, 2, 3, 4, 
5, 6, 7, 8]>, but was: elements: <[0, 1, 2, 3, 4, 5, 6, 0, 1, 2, 3, 4, 5, 6, 7, 
8, 0, 1, 2, 3, 4, 5, 6, 7, 8, 0, 1, 2, 3, 4, 5, 6, 7, 8]>
Dec 28 15:21:42 at org.junit.Assert.fail(Assert.java:89)
Dec 28 15:21:42 at 
org.apache.flink.streaming.connectors.kafka.KafkaTestBase.assertExactlyOnceForTopic(KafkaTestBase.java:331)
Dec 28 15:21:42 at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testScaleUpAfterScalingDown(FlinkKafkaProducerITCase.java:460)
Dec 28 15:21:42 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
Dec 28 15:21:42 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
Dec 28 15:21:42 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
Dec 28 15:21:42 at java.lang.reflect.Method.invoke(Method.java:498)
Dec 28 15:21:42 at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
Dec 28 15:21:42 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
Dec 28 15:21:42 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
Dec 28 15:21:42 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
Dec 28 15:21:42 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
Dec 28 15:21:42 at 
org.apache.flink.testutils.junit.RetryRule$RetryOnFailureStatement.evaluate(RetryRule.java:135)
Dec 28 15:21:42 at 
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
Dec 28 15:21:42 at 
org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
Dec 28 15:21:42 at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
Dec 28 15:21:42 at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
Dec 28 15:21:42 at 
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
Dec 28 15:21:42 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
Dec 28 15:21:42 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
Dec 28 15:21:42 at 
org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
Dec 28 15:21:42 at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
Dec 28 15:21:42 at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
Dec 28 15:21:42 at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
Dec 28 15:21:42 at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
Dec 28 15:21:42 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
Dec 28 15:21:42 at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
Dec 28 15:21:42 at 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
Dec 28 15:21:42 at org.junit.rules.RunRules.evaluate(RunRules.java:20)
Dec 28 15:21:42 at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
Dec 28 15:21:42 at 
org.junit.runners.ParentRunner.run(ParentRunner.java:413)
Dec 28 15:21:42 at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
Dec 28 15:21:42 at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
Dec 28 15:21:42 at 
org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
Dec 28 15:21:42 at 
org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
Dec 28 15:21:42 at 
org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
Dec 28 15:21:42 at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
Dec 28 15:21:42 at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
Dec 28 15:21:42 at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
Dec 28 15:21:42

Re: [DISCUSS] Introduce Hash Lookup Join

2021-12-28 Thread Jing Zhang
Hi Martijn,
Thanks a lot for your attention.
I'm sorry I didn't explain the motivation clearly. I would like to explain
it in detail, and then give response on your questions.
A lookup join is typically used to enrich a table with data that is queried
from an external system. Many Lookup table sources introduce cache in order
to reduce the RPC call, such as JDBC, CSV, HBase connectors.
For those connectors, we could raise cache hit ratio by routing the same
lookup keys to the same task instance. This is the purpose of
https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
.
Other cases might benefit from Hash distribution, such as batch hash join
as you mentioned. It is a cool idea, however it is not the purpose of this
FLIP, we could discuss this in FLINK-20670
.

> - When I was reading about this topic [1] I was wondering if this feature
would be more beneficial for bounded use cases and not so much for
unbounded use cases. What do you think?

As mentioned before, the purpose of Hash Lookup Join is to increase the
cache hit ratio which is different from Oracle Hash Join. However we could
use the similar hint syntax.

> - If I look at the current documentation for SQL Hints in Flink [2], I
notice that all of the hints there are located at the end of the SQL
statement. In the FLIP, the use_hash is defined directly after the 'SELECT'
keyword. Can we somehow make this consistent for the user? Or should the
user be able to specify hints anywhere in its SQL statement?

Calcite supports hints in two locations [3]:
Query Hint: right after the SELECT keyword;
Table Hint: right after the referenced table name.
Now Flink has supported dynamic table options based on the Hint framework
of Calcite which is mentioned in doc[2].
Besides, query hints are also important, it could give a hint for
optimizers to choose a better plan. Almost all popular databases and
big-data engines support sql query hints, such as oracle, hive, spark and
so on.
I think using query hints in this case is more natural for users, WDYT?

I have updated the motivation part in the FLIP,
Thanks for the feedback!

[1] https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI
[2]
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/
[3] https://calcite.apache.org/docs/reference.html#sql-hints

Best,
Jing Zhang

Martijn Visser  于2021年12月28日周二 22:02写道:

> Hi Jing,
>
> Thanks a lot for the explanation and the FLIP. I definitely learned
> something when reading more about `use_hash`. My interpretation would be
> that the primary benefit of a hash lookup join would be improved
> performance by allowing the user to explicitly optimise the planner.
>
> I have a couple of questions:
>
> - When I was reading about this topic [1] I was wondering if this feature
> would be more beneficial for bounded use cases and not so much for
> unbounded use cases. What do you think?
> - If I look at the current documentation for SQL Hints in Flink [2], I
> notice that all of the hints there are located at the end of the SQL
> statement. In the FLIP, the use_hash is defined directly after the 'SELECT'
> keyword. Can we somehow make this consistent for the user? Or should the
> user be able to specify hints anywhere in its SQL statement?
>
> Best regards,
>
> Martijn
>
> [1] https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI
> [2]
>
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/
>
>
> On Tue, 28 Dec 2021 at 08:17, Jing Zhang  wrote:
>
> > Hi everyone,
> > Look up join
> > <
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
> > >[1]
> > is
> > commonly used feature in Flink SQL. We have received many optimization
> > requirements on look up join. For example:
> > 1. Enforces left side of lookup join do a hash partitioner to raise cache
> > hint ratio
> > 2. Solves the data skew problem after introduces hash lookup join
> > 3. Enables mini-batch optimization to reduce RPC call
> >
> > Next we will solve these problems one by one. Firstly,  we would focus on
> > point 1, and continue to discuss point 2 and point 3 later.
> >
> > There are many similar requirements from user mail list and JIRA about
> hash
> > Lookup Join, for example:
> > 1. FLINK-23687  -
> > Introduce partitioned lookup join to enforce input of LookupJoin to hash
> > shuffle by lookup keys
> > 2. FLINK-25396  -
> > lookupjoin source table for pre-partitioning
> > 3. FLINK-25262  -
> > Support to send data to lookup table for KeyGroupStreamPartitioner way
> for
> > SQL.
> >
> > In this FLIP, I would like to start a discussion about Hash Lookup Join.
> > The core idea is introducing a 'USE_HASH' hint in query.  This syntax is
> 

[jira] [Created] (FLINK-25468) Local recovery fails if local state storage and RocksDB working directory are not on the same volume

2021-12-28 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-25468:
-

 Summary: Local recovery fails if local state storage and RocksDB 
working directory are not on the same volume
 Key: FLINK-25468
 URL: https://issues.apache.org/jira/browse/FLINK-25468
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.14.2, 1.13.5, 1.15.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.15.0, 1.13.6, 1.14.3


Local recovery with RocksDB fails if the state storage directory is not on the 
same volume as RocksDB's working directory. The reason is that the 
{{RocksDBHandle}} only tries to hard link the RocksDB files when calling 
{{restoreInstanceDirectoryFromPath}}. If hard linking is not supported, then 
the operation fails.

In order to harden this behaviour, I suggest to fall back to copying the files 
over if hard linking fails.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[DISCUSS] FLIP-201: Persist local state in working directory

2021-12-28 Thread Till Rohrmann
Hi everyone,

I would like to start a discussion about using the working directory to
persist local state for faster recovery (FLIP-201) [1]. Persisting the
local state will be beneficial if a crashed process is restarted with the
same working directory. In this case, Flink does not have to download the
state artifacts again and can recover locally.

A POC can be found here [2].

Looking forward to your feedback.

[1] https://cwiki.apache.org/confluence/x/wJuqCw
[2] https://github.com/tillrohrmann/flink/tree/FLIP-201

Cheers,
Till


Re: [DISCUSS] Introduce Hash Lookup Join

2021-12-28 Thread Martijn Visser
Hi Jing,

Thanks a lot for the explanation and the FLIP. I definitely learned
something when reading more about `use_hash`. My interpretation would be
that the primary benefit of a hash lookup join would be improved
performance by allowing the user to explicitly optimise the planner.

I have a couple of questions:

- When I was reading about this topic [1] I was wondering if this feature
would be more beneficial for bounded use cases and not so much for
unbounded use cases. What do you think?
- If I look at the current documentation for SQL Hints in Flink [2], I
notice that all of the hints there are located at the end of the SQL
statement. In the FLIP, the use_hash is defined directly after the 'SELECT'
keyword. Can we somehow make this consistent for the user? Or should the
user be able to specify hints anywhere in its SQL statement?

Best regards,

Martijn

[1] https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI
[2]
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/


On Tue, 28 Dec 2021 at 08:17, Jing Zhang  wrote:

> Hi everyone,
> Look up join
> <
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
> >[1]
> is
> commonly used feature in Flink SQL. We have received many optimization
> requirements on look up join. For example:
> 1. Enforces left side of lookup join do a hash partitioner to raise cache
> hint ratio
> 2. Solves the data skew problem after introduces hash lookup join
> 3. Enables mini-batch optimization to reduce RPC call
>
> Next we will solve these problems one by one. Firstly,  we would focus on
> point 1, and continue to discuss point 2 and point 3 later.
>
> There are many similar requirements from user mail list and JIRA about hash
> Lookup Join, for example:
> 1. FLINK-23687  -
> Introduce partitioned lookup join to enforce input of LookupJoin to hash
> shuffle by lookup keys
> 2. FLINK-25396  -
> lookupjoin source table for pre-partitioning
> 3. FLINK-25262  -
> Support to send data to lookup table for KeyGroupStreamPartitioner way for
> SQL.
>
> In this FLIP, I would like to start a discussion about Hash Lookup Join.
> The core idea is introducing a 'USE_HASH' hint in query.  This syntax is
> directly user-oriented and therefore requires careful design.
> There are two ways about how to propagate this hint to LookupJoin in
> optimizer. We need further discussion to do final decide. Anyway, the
> difference between the two solution is only about the internal
> implementation and has no impact on the user.
>
> For more detail on the proposal:
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
>
>
> Looking forward to your feedback, thanks.
>
> Best,
> Jing Zhang
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
>


[VOTE] Apache Flink ML Release 2.0.0, release candidate #2

2021-12-28 Thread Yun Gao
Hi everyone,
Please review and vote on the release candidate #2 for the version 2.0.0 of 
Apache Flink ML,
as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)
**Testing Guideline** 
You can find here [1] a page in the project wiki on instructions for testing. 
To cast a vote, it is not necessary to perform all listed checks, but please 
mention which checks you have performed when voting. 
**Release Overview**
As an overview, the release consists of the following:
a) Flink ML source release to be deployed to dist.apache.org
b) Flink ML Python source distributions to be deployed to PyPI
c) Maven artifacts to be deployed to the Maven Central Repository
**Staging Areas to Review**
The staging areas containing the above mentioned artifacts are as follows, for 
your review:
* All artifacts for a) and b) can be found in the corresponding dev repository 
at dist.apache.org [2], which are signed with the key with fingerprint 
CBE82BEFD827B08AFA843977EDBF922A7BC84897 [3]
* All artifacts for c) can be found at the Apache Nexus Repository [4]
Other links for your review:
* JIRA release notes [5]
* Source code tag "release-2.0.0-rc2" [6]
* PR to update the website Downloads page to include Flink ML links [7]
**Vote Duration**
The voting time will run for at least 72 hours.
It is adopted by majority approval, with at least 3 PMC affirmative votes.
Thanks,
Dong and Yun
[1] 
https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+ML+Release
[2] https://dist.apache.org/repos/dist/dev/flink/flink-ml-2.0.0-rc2/
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4] https://repository.apache.org/content/repositories/orgapacheflink-1475
[5] 
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351079
[6] https://github.com/apache/flink-ml/releases/tag/release-2.0.0-rc2
[7] https://github.com/apache/flink-web/pull/493


[jira] [Created] (FLINK-25467) Job failed during initialization of JobManager

2021-12-28 Thread tim.yuan (Jira)
tim.yuan created FLINK-25467:


 Summary: Job failed during initialization of JobManager
 Key: FLINK-25467
 URL: https://issues.apache.org/jira/browse/FLINK-25467
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.14.2
Reporter: tim.yuan


HA   deploy

slave  open runing task fail !

 

2021-12-28 19:13:03,992 WARN  
org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler [] - The 
connection was unexpectedly closed by the client.
2021-12-28 19:13:44,172 ERROR 
org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  [] - Unhandled 
exception.
java.util.concurrent.CancellationException: null
    at 
java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2276) 
~[?:1.8.0_312]
    at 
org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.getExecutionGraphInternal(DefaultExecutionGraphCache.java:98)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
    at 
org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.getExecutionGraphInfo(DefaultExecutionGraphCache.java:67)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
    at 
org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.handleRequest(AbstractExecutionGraphHandler.java:81)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
    at 
org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
    at 
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
    at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
    at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_312]
    at 
org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45) 
[flink-dist_2.12-1.14.0.jar:1.14.0]
    at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
 [flink-dist_2.12-1.14.0.jar:1.14.0]
    at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
 [flink-dist_2.12-1.14.0.jar:1.14.0]
    at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
 [flink-dist_2.12-1.14.0.jar:1.14.0]
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 [flink-dist_2.12-1.14.0.jar:1.14.0]
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 [flink-dist_2.12-1.14.0.jar:1.14.0]
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 [flink-dist_2.12-1.14.0.jar:1.14.0]
    at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
 [flink-dist_2.12-1.14.0.jar:1.14.0]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: Re: [VOTE] Apache Flink ML Release 2.0.0, release candidate #1

2021-12-28 Thread Yun Gao
Ah, sorry for that I used the wrong key... I'll cancel this candidate and 
initiate a new release candidate...



 --Original Mail --
Sender:Dong Lin 
Send Date:Tue Dec 28 16:12:29 2021
Recipients:Yun Gao 
CC:dev 
Subject:Re: [VOTE] Apache Flink ML Release 2.0.0, release candidate #1

Thank you Yun for releasing Flink ML!

I downloaded the artifacts from [1], installed the keys from [2] and then tried 
"gpg --verify apache-flink-ml-2.0.0.tar.gz.asc apache-flink-ml-2.0.0.tar.gz". 
It seems that the signature could not be verified due to "No public key".

[1] https://dist.apache.org/repos/dist/dev/flink/flink-ml-2.0.0-rc1
[2] https://dist.apache.org/repos/dist/release/flink/KEYS


On Tue, Dec 28, 2021 at 11:45 AM Yun Gao  wrote:

Hi everyone,

Please review and vote on the release candidate #1 for the version 2.0.0 of 
Apache Flink ML,
as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)
**Testing Guideline** 

You can find here [1] a page in the project wiki on instructions for testing. 
To cast a vote, it is not necessary to perform all listed checks, but please 
mention which checks you have performed when voting. 

**Release Overview**

As an overview, the release consists of the following:
a) Flink ML source release to be deployed to dist.apache.org
b) Flink ML Python source distributions to be deployed to PyPI
c) Maven artifacts to be deployed to the Maven Central Repository
**Staging Areas to Review**

The staging areas containing the above mentioned artifacts are as follows, for 
your review:
* All artifacts for a) and b) can be found in the corresponding dev repository 
at dist.apache.org [2], 
which are signed with the key with fingerprint 
CBE82BEFD827B08AFA843977EDBF922A7BC84897 [3]
* All artifacts for c) can be found at the Apache Nexus Repository [4]

Other links for your review:
* JIRA release notes [5]
* Source code tag "release-2.0.0-rc1" [6]
* PR to update the website Downloads page to include Flink ML links [7]
**Vote Duration**
The voting time will run for at least 72 hours.
It is adopted by majority approval, with at least 3 PMC affirmative votes.

Thanks,
Dong and Yun
[1] 
https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+ML+Release
[2] https://dist.apache.org/repos/dist/dev/flink/flink-ml-2.0.0-rc1/
[3]  https://dist.apache.org/repos/dist/release/flink/KEYS
[4] https://repository.apache.org/content/repositories/orgapacheflink-1473/
[5] 
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351079
[6] https://github.com/apache/flink-ml/releases/tag/release-2.0.0-rc1
[7] https://github.com/apache/flink-web/pull/493







Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-12-28 Thread Till Rohrmann
Hi Jingsong,

I think that developing flink-dynamic-storage as a separate sub project is
a very good idea since it allows us to move a lot faster and decouple
releases from Flink. Hence big +1.

Do we want to name it flink-dynamic-storage or shall we use a more
descriptive name? dynamic-storage sounds a bit generic to me and I wouldn't
know that this has something to do with letting Flink manage your tables
and their storage. I don't have a very good idea but maybe we can call it
flink-managed-tables, flink-warehouse, flink-olap or so.

Cheers,
Till

On Tue, Dec 28, 2021 at 9:49 AM Martijn Visser 
wrote:

> Hi Jingsong,
>
> That sounds promising! +1 from my side to continue development under
> flink-dynamic-storage as a Flink subproject. I think having a more in-depth
> interface will benefit everyone.
>
> Best regards,
>
> Martijn
>
> On Tue, 28 Dec 2021 at 04:23, Jingsong Li  wrote:
>
>> Hi all,
>>
>> After some experimentation, we felt no problem putting the dynamic
>> storage outside of flink, and it also allowed us to design the
>> interface in more depth.
>>
>> What do you think? If there is no problem, I am asking for PMC's help
>> here: we want to propose flink-dynamic-storage as a flink subproject,
>> and we want to build the project under apache.
>>
>> Best,
>> Jingsong
>>
>>
>> On Wed, Nov 24, 2021 at 8:10 PM Jingsong Li 
>> wrote:
>> >
>> > Hi Stephan,
>> >
>> > Thanks for your reply.
>> >
>> > Data never expires automatically.
>> >
>> > If there is a need for data retention, the user can choose one of the
>> > following options:
>> > - In the SQL for querying the managed table, users filter the data by
>> themselves
>> > - Define the time partition, and users can delete the expired
>> > partition by themselves. (DROP PARTITION ...)
>> > - In the future version, we will support the "DELETE FROM" statement,
>> > users can delete the expired data according to the conditions.
>> >
>> > So to answer your question:
>> >
>> > > Will the VMQ send retractions so that the data will be removed from
>> the table (via compactions)?
>> >
>> > The current implementation is not sending retraction, which I think
>> > theoretically should be sent, currently the user can filter by
>> > subsequent conditions.
>> > And yes, the subscriber would not see strictly a correct result. I
>> > think this is something we can improve for Flink SQL.
>> >
>> > > Do we want time retention semantics handled by the compaction?
>> >
>> > Currently, no, Data never expires automatically.
>> >
>> > > Do we want to declare those types of queries "out of scope" initially?
>> >
>> > I think we want users to be able to use three options above to
>> > accomplish their requirements.
>> >
>> > I will update FLIP to make the definition clearer and more explicit.
>> >
>> > Best,
>> > Jingsong
>> >
>> > On Wed, Nov 24, 2021 at 5:01 AM Stephan Ewen 
>> wrote:
>> > >
>> > > Thanks for digging into this.
>> > > Regarding this query:
>> > >
>> > > INSERT INTO the_table
>> > >   SELECT window_end, COUNT(*)
>> > > FROM (TUMBLE(TABLE interactions, DESCRIPTOR(ts), INTERVAL '5'
>> MINUTES))
>> > > GROUP BY window_end
>> > >   HAVING now() - window_end <= INTERVAL '14' DAYS;
>> > >
>> > > I am not sure I understand what the conclusion is on the data
>> retention question, where the continuous streaming SQL query has retention
>> semantics. I think we would need to answer the following questions (I will
>> call the query that computed the managed table the "view materializer
>> query" - VMQ).
>> > >
>> > > (1) I guess the VMQ will send no updates for windows beyond the
>> "retention period" is over (14 days), as you said. That makes sense.
>> > >
>> > > (2) Will the VMQ send retractions so that the data will be removed
>> from the table (via compactions)?
>> > >   - if yes, this seems semantically better for users, but it will be
>> expensive to keep the timers for retractions.
>> > >   - if not, we can still solve this by adding filters to queries
>> against the managed table, as long as these queries are in Flink.
>> > >   - any subscriber to the changelog stream would not see strictly a
>> correct result if we are not doing the retractions
>> > >
>> > > (3) Do we want time retention semantics handled by the compaction?
>> > >   - if we say that we lazily apply the deletes in the queries that
>> read the managed tables, then we could also age out the old data during
>> compaction.
>> > >   - that is cheap, but it might be too much of a special case to be
>> very relevant here.
>> > >
>> > > (4) Do we want to declare those types of queries "out of scope"
>> initially?
>> > >   - if yes, how many users are we affecting? (I guess probably not
>> many, but would be good to hear some thoughts from others on this)
>> > >   - should we simply reject such queries in the optimizer as "not
>> possible to support in managed tables"? I would suggest that, always better
>> to tell users exactly what works and what not, rather than letting them be
>> surprised 

Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-12-28 Thread Martijn Visser
Hi Jingsong,

That sounds promising! +1 from my side to continue development under
flink-dynamic-storage as a Flink subproject. I think having a more in-depth
interface will benefit everyone.

Best regards,

Martijn

On Tue, 28 Dec 2021 at 04:23, Jingsong Li  wrote:

> Hi all,
>
> After some experimentation, we felt no problem putting the dynamic
> storage outside of flink, and it also allowed us to design the
> interface in more depth.
>
> What do you think? If there is no problem, I am asking for PMC's help
> here: we want to propose flink-dynamic-storage as a flink subproject,
> and we want to build the project under apache.
>
> Best,
> Jingsong
>
>
> On Wed, Nov 24, 2021 at 8:10 PM Jingsong Li 
> wrote:
> >
> > Hi Stephan,
> >
> > Thanks for your reply.
> >
> > Data never expires automatically.
> >
> > If there is a need for data retention, the user can choose one of the
> > following options:
> > - In the SQL for querying the managed table, users filter the data by
> themselves
> > - Define the time partition, and users can delete the expired
> > partition by themselves. (DROP PARTITION ...)
> > - In the future version, we will support the "DELETE FROM" statement,
> > users can delete the expired data according to the conditions.
> >
> > So to answer your question:
> >
> > > Will the VMQ send retractions so that the data will be removed from
> the table (via compactions)?
> >
> > The current implementation is not sending retraction, which I think
> > theoretically should be sent, currently the user can filter by
> > subsequent conditions.
> > And yes, the subscriber would not see strictly a correct result. I
> > think this is something we can improve for Flink SQL.
> >
> > > Do we want time retention semantics handled by the compaction?
> >
> > Currently, no, Data never expires automatically.
> >
> > > Do we want to declare those types of queries "out of scope" initially?
> >
> > I think we want users to be able to use three options above to
> > accomplish their requirements.
> >
> > I will update FLIP to make the definition clearer and more explicit.
> >
> > Best,
> > Jingsong
> >
> > On Wed, Nov 24, 2021 at 5:01 AM Stephan Ewen 
> wrote:
> > >
> > > Thanks for digging into this.
> > > Regarding this query:
> > >
> > > INSERT INTO the_table
> > >   SELECT window_end, COUNT(*)
> > > FROM (TUMBLE(TABLE interactions, DESCRIPTOR(ts), INTERVAL '5'
> MINUTES))
> > > GROUP BY window_end
> > >   HAVING now() - window_end <= INTERVAL '14' DAYS;
> > >
> > > I am not sure I understand what the conclusion is on the data
> retention question, where the continuous streaming SQL query has retention
> semantics. I think we would need to answer the following questions (I will
> call the query that computed the managed table the "view materializer
> query" - VMQ).
> > >
> > > (1) I guess the VMQ will send no updates for windows beyond the
> "retention period" is over (14 days), as you said. That makes sense.
> > >
> > > (2) Will the VMQ send retractions so that the data will be removed
> from the table (via compactions)?
> > >   - if yes, this seems semantically better for users, but it will be
> expensive to keep the timers for retractions.
> > >   - if not, we can still solve this by adding filters to queries
> against the managed table, as long as these queries are in Flink.
> > >   - any subscriber to the changelog stream would not see strictly a
> correct result if we are not doing the retractions
> > >
> > > (3) Do we want time retention semantics handled by the compaction?
> > >   - if we say that we lazily apply the deletes in the queries that
> read the managed tables, then we could also age out the old data during
> compaction.
> > >   - that is cheap, but it might be too much of a special case to be
> very relevant here.
> > >
> > > (4) Do we want to declare those types of queries "out of scope"
> initially?
> > >   - if yes, how many users are we affecting? (I guess probably not
> many, but would be good to hear some thoughts from others on this)
> > >   - should we simply reject such queries in the optimizer as "not
> possible to support in managed tables"? I would suggest that, always better
> to tell users exactly what works and what not, rather than letting them be
> surprised in the end. Users can still remove the HAVING clause if they want
> the query to run, and that would be better than if the VMQ just silently
> ignores those semantics.
> > >
> > > Thanks,
> > > Stephan
> > >
> >
> >
> > --
> > Best, Jingsong Lee
>
>
>
> --
> Best, Jingsong Lee
>


Re: [VOTE] Apache Flink ML Release 2.0.0, release candidate #1

2021-12-28 Thread Dong Lin
Thank you Yun for releasing Flink ML!

I downloaded the artifacts from [1], installed the keys from [2] and then
tried "gpg --verify apache-flink-ml-2.0.0.tar.gz.asc
apache-flink-ml-2.0.0.tar.gz". It seems that the signature could not be
verified due to "No public key".

[1] https://dist.apache.org/repos/dist/dev/flink/flink-ml-2.0.0-rc1
[2] https://dist.apache.org/repos/dist/release/flink/KEYS


On Tue, Dec 28, 2021 at 11:45 AM Yun Gao  wrote:

> Hi everyone,
>
> Please review and vote on the release candidate #1 for the version 2.0.0
> of Apache Flink ML,
> as follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
> **Testing Guideline**
>
> You can find here [1] a page in the project wiki on instructions for
> testing.
> To cast a vote, it is not necessary to perform all listed checks, but
> please
> mention which checks you have performed when voting.
>
> **Release Overview**
>
> As an overview, the release consists of the following:
> a) Flink ML source release to be deployed to dist.apache.org
> b) Flink ML Python source distributions to be deployed to PyPI
> c) Maven artifacts to be deployed to the Maven Central Repository
>
> **Staging Areas to Review**
>
> The staging areas containing the above mentioned artifacts are as follows,
> for your review:
> * All artifacts for a) and b) can be found in the corresponding dev
> repository at dist.apache.org [2],
> which are signed with the key with fingerprint
> CBE82BEFD827B08AFA843977EDBF922A7BC84897 [3]
> * All artifacts for c) can be found at the Apache Nexus Repository [4]
>
> Other links for your review:
> * JIRA release notes [5]
> * Source code tag "release-2.0.0-rc1" [6]
> * PR to update the website Downloads page to include Flink ML links [7]
>
> **Vote Duration**
> The voting time will run for at least 72 hours.
> It is adopted by majority approval, with at least 3 PMC affirmative votes.
>
> Thanks,
> Dong and Yun
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+ML+Release
> [2] https://dist.apache.org/repos/dist/dev/flink/flink-ml-2.0.0-rc1/
> [3]  https://dist.apache.org/repos/dist/release/flink/KEYS
> [4]
> https://repository.apache.org/content/repositories/orgapacheflink-1473/
> [5]
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351079
> [6] https://github.com/apache/flink-ml/releases/tag/release-2.0.0-rc1
> [7] https://github.com/apache/flink-web/pull/493
>
>
>
>
>
>
>