[jira] [Created] (FLINK-21276) flink-statefun/statefun-sdk-java should mention Protobuf in NOTICE and bundle its license

2021-02-03 Thread Tzu-Li (Gordon) Tai (Jira)
Tzu-Li (Gordon) Tai created FLINK-21276:
---

 Summary: flink-statefun/statefun-sdk-java should mention Protobuf 
in NOTICE and bundle its license
 Key: FLINK-21276
 URL: https://issues.apache.org/jira/browse/FLINK-21276
 Project: Flink
  Issue Type: Task
  Components: Stateful Functions
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
 Fix For: statefun-3.0.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS]FLIP-163: SQL Client Improvements

2021-02-03 Thread godfrey he
Hi everyone,

Regarding "table.planner" and "table.execution-mode"
If we define that those two options are just used to initialize the
TableEnvironment, +1 for introducing table options instead of sql-client
options.

Regarding "the sql client, we will maintain two parsers", I want to give
more inputs:
We want to introduce sql-gateway into the Flink project (see FLIP-24 &
FLIP-91 for more info [1] [2]). In the "gateway" mode, the CLI client and
the gateway service will communicate through Rest API. The " ADD JAR
/local/path/jar " will be executed in the CLI client machine. So when we
submit a sql file which contains multiple statements, the CLI client needs
to pick out the "ADD JAR" line, and also statements need to be submitted or
executed one by one to make sure the result is correct. The sql file may be
look like:

SET xxx=yyy;
create table my_table ...;
create table my_sink ...;
ADD JAR /local/path/jar1;
create function my_udf as comMyUdf;
insert into my_sink select ..., my_udf(xx) from ...;
REMOVE JAR /local/path/jar1;
drop function my_udf;
ADD JAR /local/path/jar2;
create function my_udf as comMyUdf2;
insert into my_sink select ..., my_udf(xx) from ...;

The lines need to be splitted into multiple statements first in the CLI
client, there are two approaches:
1. The CLI client depends on the sql-parser: the sql-parser splits the
lines and tells which lines are "ADD JAR".
pro: there is only one parser
cons: It's a little heavy that the CLI client depends on the sql-parser,
because the CLI client is just a simple tool which receives the user
commands and displays the result. The non "ADD JAR" command will be parsed
twice.

2. The CLI client splits the lines into multiple statements and finds the
ADD JAR command through regex matching.
pro: The CLI client is very light-weight.
cons: there are two parsers.

(personally, I prefer the second option)

Regarding "SHOW or LIST JARS", I think we can support them both.
For default dialect, we support SHOW JARS, but if we switch to hive
dialect, LIST JARS is also supported.


[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-91%3A+Support+SQL+Client+Gateway

Best,
Godfrey

Rui Li  于2021年2月4日周四 上午10:40写道:

> Hi guys,
>
> Regarding #3 and #4, I agree SHOW JARS is more consistent with other
> commands than LIST JARS. I don't have a strong opinion about REMOVE vs
> DELETE though.
>
> While flink doesn't need to follow hive syntax, as far as I know, most
> users who are requesting these features are previously hive users. So I
> wonder whether we can support both LIST/SHOW JARS and REMOVE/DELETE JARS
> as synonyms? It's just like lots of systems accept both EXIT and QUIT as
> the command to terminate the program. So if that's not hard to achieve, and
> will make users happier, I don't see a reason why we must choose one over
> the other.
>
> On Wed, Feb 3, 2021 at 10:33 PM Timo Walther  wrote:
>
> > Hi everyone,
> >
> > some feedback regarding the open questions. Maybe we can discuss the
> > `TableEnvironment.executeMultiSql` story offline to determine how we
> > proceed with this in the near future.
> >
> > 1) "whether the table environment has the ability to update itself"
> >
> > Maybe there was some misunderstanding. I don't think that we should
> > support `tEnv.getConfig.getConfiguration.setString("table.planner",
> > "old")`. Instead I'm proposing to support
> > `TableEnvironment.create(Configuration)` where planner and execution
> > mode are read immediately and a subsequent changes to these options will
> > have no effect. We are doing it similar in `new
> > StreamExecutionEnvironment(Configuration)`. These two ConfigOption's
> > must not be SQL Client specific but can be part of the core table code
> > base. Many users would like to get a 100% preconfigured environment from
> > just Configuration. And this is not possible right now. We can solve
> > both use cases in one change.
> >
> > 2) "the sql client, we will maintain two parsers"
> >
> > I remember we had some discussion about this and decided that we would
> > like to maintain only one parser. In the end it is "One Flink SQL" where
> > commands influence each other also with respect to keywords. It should
> > be fine to include the SQL Client commands in the Flink parser. Of
> > cource the table environment would not be able to handle the `Operation`
> > instance that would be the result but we can introduce hooks to handle
> > those `Operation`s. Or we introduce parser extensions.
> >
> > Can we skip `table.job.async` in the first version? We should further
> > discuss whether we introduce a special SQL clause for wrapping async
> > behavior or if we use a config option? Esp. for streaming queries we
> > need to be careful and should force users to either "one INSERT INTO" or
> > "one STATEMENT SET".
> >
> > 3) 4) "HIVE also uses these commands"
> >
> > In general, Hive is not a good reference. Aligning the co

Re: [DISCUSS] FLIP-162: Consistent Flink SQL time function behavior

2021-02-03 Thread Johannes Moser
Hello,

I will work with some users to get data on that.

Thanks, Joe

> On 03.02.2021, at 14:58, Stephan Ewen  wrote:
> 
> Hi all!
> 
> A quick thought on this thread: We see a typical stalemate here, as in so
> many discussions recently.
> One developer prefers it this way, another one another way. Both have
> pro/con arguments, it takes a lot of time from everyone, still there is
> little progress in the discussion.
> 
> Ultimately, this can only be decided by talking to the users. And it
> would also be the best way to ensure that what we build is the intuitive
> and expected way for users.
> The less the users are into the deep aspects of Flink SQL, the better they
> can mirror what a common user would expect (a power user will anyways
> figure it out).
> Let's find a person to drive that, spell it out in the FLIP as "semantics
> TBD", and focus on the implementation of the parts that are agreed upon.
> 
> For interviewing the users, here are some ideas for questions to look at:
>  - How do they view the trade-off between stable semantics vs.
> out-of-the-box magic (faster getting started).
>  - How comfortable are they realizing the different meaning of "now()" in
> a streaming versus batch context.
>  - What would be their expectation when moving a query with the time
> functions ("now()") from an unbounded stream (Kafka source without end
> offset) to a bounded stream (Kafka source with end offsets), which may
> switch execution to batch.
> 
> Best,
> Stephan
> 
> 
> On Tue, Feb 2, 2021 at 3:19 PM Jark Wu  wrote:
> 
>> Hi Fabian,
>> 
>> I think we have an agreement that the functions should be evaluated at
>> query start in batch mode.
>> Because all the other batch systems and traditional databases are this
>> behavior, which is standard SQL compliant.
>> 
>> *1. The different point of view is what's the behavior in streaming mode? *
>> 
>> From my point of view, I don't see any potential meaning to evaluate at
>> query-start for a 365-day long running streaming job.
>> And from my observation, CURRENT_TIMESTAMP is heavily used by Flink
>> streaming users and they expect the current behaviors.
>> The SQL standard only provides a guideline for traditional batch systems,
>> however Flink is a leading streaming processing system
>> which is out of the scope of SQL standard, and Flink should define the
>> streaming standard. I think a standard should follow users' intuition.
>> Therefore, I think we don't need to be standard SQL compliant at this point
>> because users don't expect it.
>> Changing the behavior of the functions to evaluate at query start for
>> streaming mode will hurt most of Flink SQL users and we have nothing to
>> gain,
>> we should avoid this.
>> 
>> *2. Does it break the unified streaming-batch semantics? *
>> 
>> I don't think so. First of all, what's the unified streaming-batch
>> semantic?
>> I think it means the* eventual result* instead of the *behavior*.
>> It's hard to say we have provided unified behavior for streaming and batch
>> jobs,
>> because for example unbounded aggregate behaves very differently.
>> In batch mode, it only evaluates once for the bounded data and emits the
>> aggregate result once.
>> But in streaming mode, it evaluates for each row and emits the updated
>> result.
>> What we have always emphasized "unified streaming-batch semantics" is [1]
>> 
>>> a query produces exactly the same result regardless whether its input is
>> static batch data or streaming data.
>> 
>> From my understanding, the "semantic" means the "eventual result".
>> And time functions are non-deterministic, so it's reasonable to get
>> different results for batch and streaming mode.
>> Therefore, I think it doesn't break the unified streaming-batch semantics
>> to evaluate per-record for streaming and
>> query-start for batch, as the semantic doesn't means behavior semantic.
>> 
>> Best,
>> Jark
>> 
>> [1]: https://flink.apache.org/news/2017/04/04/dynamic-tables.html
>> 
>> On Tue, 2 Feb 2021 at 18:34, Fabian Hueske  wrote:
>> 
>>> Hi everyone,
>>> 
>>> Sorry for joining this discussion late.
>>> Let me give some thought to two of the arguments raised in this thread.
>>> 
>>> Time functions are inherently non-determintistic:
>>> --
>>> This is of course true, but IMO it doesn't mean that the semantics of
>> time
>>> functions do not matter.
>>> It makes a difference whether a function is evaluated once and it's
>> result
>>> is reused or whether it is invoked for every record.
>>> Would you use the same logic to justify different behavior of RAND() in
>>> batch and streaming queries?
>>> 
>>> Provide the semantics that most users expect:
>>> --
>>> I don't think it is clear what most users expect, esp. if we also include
>>> future users (which we certainly want to gain) into this assessment.
>>> Our current users got used to the semantics that we introduced. So I
>>> wouldn't be surprised if they would say stick with the current semantics.
>>> However, we are also claiming stand

[jira] [Created] (FLINK-21275) DataStreamTests::test_key_by_on_connect_stream Fail

2021-02-03 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-21275:
-

 Summary: DataStreamTests::test_key_by_on_connect_stream Fail
 Key: FLINK-21275
 URL: https://issues.apache.org/jira/browse/FLINK-21275
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.12.1
Reporter: Guowei Ma


=== short test summary info 
 
FAILED 
pyflink/datastream/tests/test_data_stream.py::DataStreamTests::test_key_by_on_connect_stream
 
= 1 failed, 660 passed, 23 skipped, 285 warnings in 334.60s (0:05:34) 
== 
ERROR: InvocationError for command 
/__w/2/s/flink-python/.tox/py35-cython/bin/pytest --durations=20 (exited with 
code 1) 
py36-cython create: /__w/2/s/flink-python/.tox/py36-cython 
py36-cython installdeps: pytest, apache-beam==2.23.0, cython==0.29.16, 
grpcio>=1.17.0,<=1.26.0, grpcio-tools>=1.3.5,<=1.14.2 
py36-cython inst: 
/__w/2/s/flink-python/.tox/.tmp/package/1/apache-flink-1.12.dev0.zip 
py36-cython installed: You are using pip version 10.0.1, however version 21.0.1 
is available.,You should consider upgrading via the 'pip install --upgrade pip' 
command.,apache-beam==2.23.0,apache-flink==1.12.dev0,attrs==20.3.0,avro-python3==1.9.1,certifi==2020.12.5,chardet==4.0.0,cloudpickle==1.2.2,crcmod==1.7,Cython==0.29.16,dill==0.3.1.1,docopt==0.6.2,fastavro==0.23.6,future==0.18.2,grpcio==1.35.0,grpcio-tools==1.14.2,hdfs==2.5.8,httplib2==0.17.4,idna==2.10,importlib-metadata==3.4.0,iniconfig==1.1.1,jsonpickle==1.2,mock==2.0.0,numpy==1.19.5,oauth2client==3.0.0,packaging==20.9,pandas==0.25.3,pbr==5.5.1,pluggy==0.13.1,protobuf==3.14.0,py==1.10.0,py4j==0.10.8.1,pyarrow==0.17.1,pyasn1==0.4.8,pyasn1-modules==0.2.8,pydot==1.4.1,pymongo==3.11.3,pyparsing==2.4.7,pytest==6.2.2,python-dateutil==2.8.0,pytz==2021.1,requests==2.25.1,rsa==4.7,six==1.15.0,toml==0.10.2,typing-extensions==3.7.4.3,urllib3==1.26.3,zipp==3.4.0
 
py36-cython run-test-pre: PYTHONHASHSEED='2134314047' 
py36-cython run-test: commands[0] | python --version 
Python 3.6.12 :: Anaconda, Inc. 
py36-cython run-test: commands[1] | pytest --durations=20
 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=12892&view=logs&j=e92ecf6d-e207-5a42-7ff7-528ff0c5b259&t=d59eb898-29f7-5a99-91a7-b2dfc3e8a653



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21274) At per-job mode,when the upload speed of the HDFS file system is slow, the task exits abnormally, and the task archive file upload fails.

2021-02-03 Thread Wang Jichao (Jira)
Wang Jichao created FLINK-21274:
---

 Summary: At per-job mode,when the upload speed of the HDFS file 
system is slow, the task exits abnormally, and the task archive file upload 
fails.
 Key: FLINK-21274
 URL: https://issues.apache.org/jira/browse/FLINK-21274
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.12.0, 1.11.0, 1.10.1
Reporter: Wang Jichao
 Attachments: 1.png, 2.png, 
application_1612404624605_0010-JobManager.log

This is a partial configuration of my Flink History service(flink-conf.yaml).
{code:java}
jobmanager.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
historyserver.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
{code}
I used {color:#0747a6}flink run -m yarn-cluster 
/cloud/service/flink/examples/batch/WordCount.jar{color} to submit a WorkCount 
task to the Yarn cluster. Under normal circumstances, after the task is 
completed, the task execution information will be archived to HDFS, and then 
the JobManager process will exit. However, when this archiving process takes a 
long time (maybe the HDFS write speed is slow), the task archive file upload 
fails.

The specific reproduction method is as follows:

Modify the 
{color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color}
 method to add a 5-second wait before the HDFS write action (simulating a slow 
write speed scenario).
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId, 
Collection jsonToArchive) 
throws IOException {
try {
FileSystem fs = rootPath.getFileSystem();
Path path = new Path(rootPath, jobId.toString());
OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);

try {
LOG.info("===Wait 5 seconds..");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}

try (JsonGenerator gen = jacksonFactory.createGenerator(out, 
JsonEncoding.UTF8)) {
...  // Part of the code is omitted here
} catch (Exception e) {
fs.delete(path, false);
throw e;
}
LOG.info("Job {} has been archived at {}.", jobId, path);
return path;
} catch (IOException e) {
LOG.error("Failed to archive job.", e);
throw e;
}
}
{code}
After I make the above changes to the code, I cannot find the corresponding 
task on Flink's HistoryServer(Refer to Figure 1.png and Figure 2.png).

Then I went to Yarn to browse the JobManager log (see attachment 
application_1612404624605_0010-JobManager.log for log details), and found that 
the following logs are missing in the task log:
{code:java}
INFO entrypoint.ClusterEntrypoint: Terminating cluster entrypoint process 
YarnJobClusterEntrypoint with exit code 0.{code}
Usually, if the task exits normally, a similar log will be printed before 
executing {color:#0747a6}System.exit(returnCode){color}.

If no Exception information is found in the JobManager log, the above situation 
occurs, indicating that the JobManager is running to a certain point, and there 
is no user thread in the JobManager process, which causes the program to exit 
without completing the normal process.

Eventually I found out that multiple services (for example: ioExecutor, 
metricRegistry, commonRpcService) were exited asynchronously in 
{color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#stopClusterServices{color},
 and multiple services would be exited in the shutdown() method of 
metricRegistry (for example : executor), these exit actions are executed 
asynchronously and in parallel. If ioExecutor or executor exits last, it will 
cause the above problems.

I hope to modify the following code to solve this problem. If it is determined 
that this is a problem (this problem will affect all versions above 1.9), 
please assign the ticket to me, thank you.

Only need to modify the 
{color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runClusterEntrypoint{color}
 method:
{code:java}
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {

 final String clusterEntrypointName = 
clusterEntrypoint.getClass().getSimpleName();
 try {
 clusterEntrypoint.startCluster();
 } catch (ClusterEntrypointException e) {
 LOG.error(String.format("Could not start cluster entrypoint %s.", 
clusterEntrypointName), e);
 System.exit(STARTUP_FAILURE_RETURN_CODE);
 }

 int returnCode;
 Throwable throwable = null;
 try {
 returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode();
 } catch (Throwable e) {
 throwable = e;
 returnCode = RUNTIME_FAILURE_RETURN_CODE;
 }

 LOG.info("Terminating cluster entrypoint process {} with exit code {}.", 
clusterEntrypointName, returnCode, throwable);
 System.exit(returnCode);
}{code}
 
The purpose of 

[jira] [Created] (FLINK-21273) Remove unused ExecutionVertexSchedulingRequirements

2021-02-03 Thread Yangze Guo (Jira)
Yangze Guo created FLINK-21273:
--

 Summary: Remove unused ExecutionVertexSchedulingRequirements
 Key: FLINK-21273
 URL: https://issues.apache.org/jira/browse/FLINK-21273
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Reporter: Yangze Guo


The only valid field of the ExecutionVertexSchedulingRequirements is 
ExecutionVertexID. All other fields are not referenced by any class atm. We 
could replace it with ExecutionVertexID.

This might be a subtask of FLINK-20589.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21272) Resuming Savepoint (rocks, scale down, rocks timers) end-to-end test' Fail

2021-02-03 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-21272:
-

 Summary: Resuming Savepoint (rocks, scale down, rocks timers) 
end-to-end test' Fail
 Key: FLINK-21272
 URL: https://issues.apache.org/jira/browse/FLINK-21272
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.13.0
Reporter: Guowei Ma


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=12893&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=ff888d9b-cd34-53cc-d90f-3e446d355529



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-156: Runtime Interfaces for Fine-Grained Resource Requirements

2021-02-03 Thread Kezhu Wang
Hi Xintong,

Thanks for the backgrounds!

I understand the impractical of operator level specifications and the value
of group level specifications. Just not that confident about “Coupling
between operator chaining / slot sharing”, seems to me, it requires more
knowledge than “Expose operator chaining”.

Best,
Kezhu Wang

On Thu, Feb 4, 2021 at 13:22 Xintong Song  wrote:

> Hi Kezhu,
>
> Maybe let me share some backgrounds first.
>
>- We at Alibaba have been using fine-grained resource management for
>many years, with Blink (an internal version of Flink).
>- We have been trying to contribute this feature to Apache Flink since
>many years ago. However, we haven't succeeded, due to various reasons.
>   - Back to years ago, I believe there were not many users that used
>   Flink in production at a very large scale, thus less demand for
> the feature.
>   - The feature on Blink is quite specific to our internal use cases
>   and scenarios. We have not made it general enough to cover the
> community's
>   common use cases.
>   - Divergences between Flink & Blink code bases.
>- Blink used operator-level resource interfaces. According to our years
>of production experiences, we believe that specifying operator-level
>resources are neither necessary nor easy-to-use. This is why we propose
>group-level interfaces.
>
> Back to your questions.
>
> I saw the dicussion to keep slot sharing as an hint, but in reality, will
> > SSG jobs expect to fail or
> > run slowly if scheduler does not respect it ? A slot with 20GB memory is
> > different from two 1GB
> > default sized slots. So, we actually depends on scheduler
> > version/implementation/de-fact if we
> > claim it is an hint.
> >
>
> SSG-based resource requirements are considered hints because the SSG itself
> is a hint. There's no guarantee that operators of a SSG will always be
> scheduled together. I think you have a good point that, if SSGs are not
> respected, is it prefered to fail the job or to interpret the resource of
> an actual slot. It's possible that we provide a configuration option and
> leave that decision to the users. However, that is a design choice we need
> to make when there's indeed a need for not respecting the SSGs.
>
> Do you mean code-path or production environment ? If it is code-path, could
> > you please point out where
> > the story breaks ?
> >
> > From the dicussion and history, could I consider FLIP-156 is an
> redirection
> > more than inheritance/enhancement
> > of current halfly-cooked/ancient implmentation ?
> >
>
> If you try to set the operator resources, you would find that it won't work
> at the moment. There are several things not ready.
>
>- Interfaces for setting operator resources are never really exposed to
>users.
>- The resource manager never allocates slots with the requested
>resources.
>- Managed memory size specified for operators will not be respected,
>because managed memory is shared within a slot with a different
> approach.
>
> While the first 2 points are more related to that the feature is not yet
> ready, the last point is closely related to the specifying operator level
> resources.
>
> To sum up, we do not want to support specifying operator level in the first
> step, for the following reasons.
>
>- It's not likely needed, due to poor usability compared to the
>SSG-based approach.
>- It introduces the complexity to deal with the managed memory sharing.
>- It introduces the complexity to deal with combining resource
>requirements from two different levels.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Feb 3, 2021 at 7:50 PM Kezhu Wang  wrote:
>
> > Hi Till,
> >
> > Based on what I understood, if not wrong, the door is not closed after
> SSG
> > resource specifying. So, hope it could be useful in potential future
> > improvement.
> >
> > Best,
> > Kezhu Wang
> >
> >
> > On February 3, 2021 at 18:07:21, Till Rohrmann (trohrm...@apache.org)
> > wrote:
> >
> > Thanks for sharing your thoughts Kezhu. I like your ideas of how
> > per-operator and SSG requirements can be combined. I've also thought
> about
> > defining a default resource profile for all tasks which have no resources
> > configured. That way all operators would have resources assigned if the
> > user chooses to use this feature.
> >
> > As Yangze and Xintong have said, we have decided to first only support
> > specifying resources for SSGs as this seems more user friendly. Based on
> > the feedback for this feature one potential development direction might
> be
> > to allow the resource specification on per-operator basis. Here we could
> > pick up your ideas.
> >
> > Cheers,
> > Till
> >
> > On Wed, Feb 3, 2021 at 7:31 AM Xintong Song 
> wrote:
> >
> > > Thanks for your feedback, Kezhu.
> > >
> > > I think Flink *runtime* already has an ideal granularity for resource
> > > > management 'task'. If there is
> > > > a slot shared by multiple tas

Re: [DISCUSS] FLIP-152: Hive Query Syntax Compatibility

2021-02-03 Thread Rui Li
Hi Jingsong, Godfrey and Jark,

I have updated the FLIP to incorporate your suggestions. Please let me know
if you have further comments. Thank you.

On Wed, Feb 3, 2021 at 4:51 PM Rui Li  wrote:

> Thanks Godfrey & Jark for your comments!
>
> Since both of you mentioned the naming of the parser factory, I'll answer
> this question first.
>
> I only intend to introduce the pluggable Parser for blink planner. So
> the BlinkParserFactory will be added to the flink-table-planner-blink
> module. We already have several factory classes there such as
> "BlinkPlannerFactory" and "BlinkExecutorFactory". So it seems
> "BlinkParserFactory" is following the naming convention of existing
> classes. But I'm also fine with "ParserFactory" if we're going to remove
> the "blink" prefix anyway.
> For the concrete implementation, I prefer "DefaultParserFactory". Having a
> "FlinkParserFactory" in blink planner seems a little weird. We can revisit
> this when we really get rid of the legacy planner.
>
> To answer the other questions:
>
> @Godfrey
> Yes I agree DDLs should also be handled by the HiveParser. That'll give
> users more consistent experience.
>
> The "Go Beyond Hive" section is mainly about decoupling the dialect from
> hive objects, so that users can write HiveQL to query non-hive tables or
> call non-hive functions. I think it's in the scope of this FLIP, but it
> doesn't have to be done in the first release. I'll update the FLIP to be
> more specific about it.
>
> @Jark
> Creating a new Parser only when dialect changes is a good idea. I'll
> update the FLIP to mention that.
>
> The 3.x new features we need to support in this FLIP include:
>
>1. PK & NOT NULL constraints
>2. Alter DB to change location
>
> These DDLs are already covered by FLIP-123. Missing them will be a
> regression.
>
> Other new features can be identified with more tests and user feedback,
> and will be supported incrementally.
>
> By "we can use a newer version to support older versions", I mean syntax
> in the new version is a superset of the old one. But we still need extra
> efforts to make sure the copied code works with different hive versions,
> e.g. more shim methods are required. So I'd rather start with a popular
> version than the newest version.
>
>
> On Wed, Feb 3, 2021 at 11:51 AM Jark Wu  wrote:
>
>> Thanks Rui for the great proposal, I believe this can be very attractive
>> for many Hive users.
>>
>> The FLIP looks good to me in general, I only have some minor comments:
>>
>> 1) BlinkParserFactory
>> IIUC, BlinkParserFactory is located in the flink-table-api-java module
>> with
>> the Parser interface there.
>> I suggest renaming it to `ParserFactory`, because it creates Parser
>> instead
>> of BlinkParser.
>> And the implementations can be `HiveParserFactory` and
>> `FlinkParserFactory`.
>> I think we should avoid the `Blink` keyword in interfaces, blink planner
>> is
>> already the default planner and
>> the old planner will be removed in the near future. There will be no
>> `blink` in the future then.
>>
>> 2) "create a new instance each time getParser is called"
>> Finding parser for every time getParser is called sounds heavy to me. I
>> think we can improve this by simplify
>> caching the Parser instance,  and creating a new one if current
>> sql-dialect
>> is different from the cached Parser.
>>
>> 3) Hive version
>> How much code needs to be done to support new features in 3.x based on
>> 2.x?
>> Is this also included in this FLIP/release?
>> I don't fully understand this because the FLIP says "we can use a newer
>> version to support older versions."
>>
>> Best,
>> Jark
>>
>> On Wed, 3 Feb 2021 at 11:48, godfrey he  wrote:
>>
>> > Thanks for bringing up the discussion, Rui!
>> >
>> > Regarding the DDL part in the "Introduce HiveParser" section,
>> > I would like to choose the second option. Because if we could
>> > use one hive parser to parse all hive SQLs, we need not to copy
>> > Calcite parser code, and the framework and the code will be very simple.
>> >
>> > Regarding the "Go Beyond Hive" section, is that the scope of this FLIP ?
>> > Could you list all the extensions and give some examples ?
>> >
>> > One minor suggestion about the name of ParserImplFactory.
>> > How about renaming ParserImplFactory to DefaultParserFactory ?
>> >
>> > Best,
>> > Godfrey
>> >
>> > Rui Li  于2021年2月3日周三 上午11:16写道:
>> >
>> > > Hi Jingsong,
>> > >
>> > > Thanks for your comments and they're very good questions.
>> > >
>> > > Regarding # Version, we need to do some tradeoff here. Choosing the
>> > latest
>> > > 3.x will cover all the features we want to support. But as you said,
>> 3.x
>> > > and 2.x can have some differences and requires more efforts to support
>> > > lower versions. I decided to pick 2.x and evolve from there to support
>> > new
>> > > features in 3.x. Because I think most hive users, especially those who
>> > are
>> > > likely to be interested in this feature, are still using 2.x or even
>> 1.x.
>> > > S

Re: [RESULT] [VOTE] FLIP-156: Runtime Interfaces for Fine-Grained Resource Requirements

2021-02-03 Thread Yangze Guo
Edit: The voting time of FLIP-156[1] has passed.

Best,
Yangze Guo

On Thu, Feb 4, 2021 at 1:25 PM Yangze Guo  wrote:
>
> Hi all,
>
> The voting time for changing the Public API of FLIP-156[1] has passed.
> I'm closing the vote now.
>
> There were five +1 votes, 4 of which are binding:
>
> - Xintong Song (binding)
> - Till (binding)
> - Chesnay (binding)
> - Yang Wang (non-binding)
> - Zhu Zhu (binding)
>
> There were no -1 votes.
>
> Thus, FLIP-156 has been accepted.
>
> Thanks everyone for joining the discussion and giving feedback!
>
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-156-Runtime-Interfaces-for-Fine-Grained-Resource-Requirements-td48419.html
>
> Best,
> Yangze Guo


[RESULT] [VOTE] FLIP-156: Runtime Interfaces for Fine-Grained Resource Requirements

2021-02-03 Thread Yangze Guo
Hi all,

The voting time for changing the Public API of FLIP-156[1] has passed.
I'm closing the vote now.

There were five +1 votes, 4 of which are binding:

- Xintong Song (binding)
- Till (binding)
- Chesnay (binding)
- Yang Wang (non-binding)
- Zhu Zhu (binding)

There were no -1 votes.

Thus, FLIP-156 has been accepted.

Thanks everyone for joining the discussion and giving feedback!

[1] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-156-Runtime-Interfaces-for-Fine-Grained-Resource-Requirements-td48419.html

Best,
Yangze Guo


Re: [DISCUSS] FLIP-156: Runtime Interfaces for Fine-Grained Resource Requirements

2021-02-03 Thread Xintong Song
Hi Kezhu,

Maybe let me share some backgrounds first.

   - We at Alibaba have been using fine-grained resource management for
   many years, with Blink (an internal version of Flink).
   - We have been trying to contribute this feature to Apache Flink since
   many years ago. However, we haven't succeeded, due to various reasons.
  - Back to years ago, I believe there were not many users that used
  Flink in production at a very large scale, thus less demand for
the feature.
  - The feature on Blink is quite specific to our internal use cases
  and scenarios. We have not made it general enough to cover the
community's
  common use cases.
  - Divergences between Flink & Blink code bases.
   - Blink used operator-level resource interfaces. According to our years
   of production experiences, we believe that specifying operator-level
   resources are neither necessary nor easy-to-use. This is why we propose
   group-level interfaces.

Back to your questions.

I saw the dicussion to keep slot sharing as an hint, but in reality, will
> SSG jobs expect to fail or
> run slowly if scheduler does not respect it ? A slot with 20GB memory is
> different from two 1GB
> default sized slots. So, we actually depends on scheduler
> version/implementation/de-fact if we
> claim it is an hint.
>

SSG-based resource requirements are considered hints because the SSG itself
is a hint. There's no guarantee that operators of a SSG will always be
scheduled together. I think you have a good point that, if SSGs are not
respected, is it prefered to fail the job or to interpret the resource of
an actual slot. It's possible that we provide a configuration option and
leave that decision to the users. However, that is a design choice we need
to make when there's indeed a need for not respecting the SSGs.

Do you mean code-path or production environment ? If it is code-path, could
> you please point out where
> the story breaks ?
>
> From the dicussion and history, could I consider FLIP-156 is an redirection
> more than inheritance/enhancement
> of current halfly-cooked/ancient implmentation ?
>

If you try to set the operator resources, you would find that it won't work
at the moment. There are several things not ready.

   - Interfaces for setting operator resources are never really exposed to
   users.
   - The resource manager never allocates slots with the requested
   resources.
   - Managed memory size specified for operators will not be respected,
   because managed memory is shared within a slot with a different approach.

While the first 2 points are more related to that the feature is not yet
ready, the last point is closely related to the specifying operator level
resources.

To sum up, we do not want to support specifying operator level in the first
step, for the following reasons.

   - It's not likely needed, due to poor usability compared to the
   SSG-based approach.
   - It introduces the complexity to deal with the managed memory sharing.
   - It introduces the complexity to deal with combining resource
   requirements from two different levels.


Thank you~

Xintong Song



On Wed, Feb 3, 2021 at 7:50 PM Kezhu Wang  wrote:

> Hi Till,
>
> Based on what I understood, if not wrong, the door is not closed after SSG
> resource specifying. So, hope it could be useful in potential future
> improvement.
>
> Best,
> Kezhu Wang
>
>
> On February 3, 2021 at 18:07:21, Till Rohrmann (trohrm...@apache.org)
> wrote:
>
> Thanks for sharing your thoughts Kezhu. I like your ideas of how
> per-operator and SSG requirements can be combined. I've also thought about
> defining a default resource profile for all tasks which have no resources
> configured. That way all operators would have resources assigned if the
> user chooses to use this feature.
>
> As Yangze and Xintong have said, we have decided to first only support
> specifying resources for SSGs as this seems more user friendly. Based on
> the feedback for this feature one potential development direction might be
> to allow the resource specification on per-operator basis. Here we could
> pick up your ideas.
>
> Cheers,
> Till
>
> On Wed, Feb 3, 2021 at 7:31 AM Xintong Song  wrote:
>
> > Thanks for your feedback, Kezhu.
> >
> > I think Flink *runtime* already has an ideal granularity for resource
> > > management 'task'. If there is
> > > a slot shared by multiple tasks, that slot's resource requirement is
> > simple
> > > sum of all its logical
> > > slots. So basically, this is no resource requirement for
> SlotSharingGroup
> > > in runtime until now,
> > > right ?
> >
> > That is a halfly-cooked implementation, coming from the previous attempts
> > (years ago) trying to deliver the fine-grained resource management
> feature,
> > and never really put into use.
> >
> > From the FLIP and dicusssion, I assume that SSG resource specifying will
> > > override operator level
> > > resource specifying if both are specified ?
> > >
> > Actually, I think we should

[jira] [Created] (FLINK-21271) DataStreamTests.test_key_by_on_connect_stream test failed with "ArrayIndexOutOfBoundsException" in py35

2021-02-03 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-21271:


 Summary: DataStreamTests.test_key_by_on_connect_stream test failed 
with "ArrayIndexOutOfBoundsException" in py35
 Key: FLINK-21271
 URL: https://issues.apache.org/jira/browse/FLINK-21271
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.12.2
Reporter: Huang Xingbo


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=12892&view=logs&j=e92ecf6d-e207-5a42-7ff7-528ff0c5b259&t=d59eb898-29f7-5a99-91a7-b2dfc3e8a653]
{code:java}
2021-02-03T23:58:12.7918092Z E   Caused by: 
java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 0
2021-02-03T23:58:12.7918584Z E  at 
java.base/java.util.ArrayList.add(ArrayList.java:487)
2021-02-03T23:58:12.7918997Z E  at 
java.base/java.util.ArrayList.add(ArrayList.java:499)
2021-02-03T23:58:12.7919587Z E  at 
org.apache.flink.streaming.api.runners.python.beam.PythonSharedResources.addPythonEnvironmentManager(PythonSharedResources.java:66)
2021-02-03T23:58:12.7920305Z E  at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:253)
2021-02-03T23:58:12.7921116Z E  at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:113)
2021-02-03T23:58:12.7921850Z E  at 
org.apache.flink.streaming.api.operators.python.TwoInputPythonFunctionOperator.open(TwoInputPythonFunctionOperator.java:144)
2021-02-03T23:58:12.7922555Z E  at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:428)
2021-02-03T23:58:12.7923247Z E  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543)
2021-02-03T23:58:12.7923983Z E  at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
2021-02-03T23:58:12.7924610Z E  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533)
2021-02-03T23:58:12.7925161Z E  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
2021-02-03T23:58:12.7925683Z E  at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
2021-02-03T23:58:12.7926259Z E  at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
2021-02-03T23:58:12.7926849Z E  at 
java.base/java.lang.Thread.run(Thread.java:834)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21270) Generate the slot request respect to the resource specification of SlotSharingGroup if present

2021-02-03 Thread Yangze Guo (Jira)
Yangze Guo created FLINK-21270:
--

 Summary: Generate the slot request respect to the resource 
specification of SlotSharingGroup if present
 Key: FLINK-21270
 URL: https://issues.apache.org/jira/browse/FLINK-21270
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Yangze Guo
 Fix For: 1.13.0


Currently, slot requests for SSGs are generated by 
SlotSharingExecutionSlotAllocator. We propose to make 
SlotSharingExecutionSlotAllocator use the resource requirements in 
corresponding SlotSharingGroups for generating the slot requests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21269) Introduce runtime interfaces for specifying SSG-based resource requirements

2021-02-03 Thread Yangze Guo (Jira)
Yangze Guo created FLINK-21269:
--

 Summary: Introduce runtime interfaces for specifying SSG-based 
resource requirements
 Key: FLINK-21269
 URL: https://issues.apache.org/jira/browse/FLINK-21269
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Configuration
Reporter: Yangze Guo
 Fix For: 1.13.0


As the entrypoint of the unified runtime, StreamGraphGenerator takes 
Transformations and various settings from user development APIs, and generates 
StreamGraph accordingly.

We propose to add StreamGraphGenerator#setSlotSharingGroupResource interface 
for specifying fine-grained resource requirements for SSGs.

The specified SSG resource requirements need to be passed on all the way to the 
corresponding SlotSharingGroup in ExecutionGraph.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21268) Logs view can't scoll in Firefox

2021-02-03 Thread Frost Wong (Jira)
Frost Wong created FLINK-21268:
--

 Summary: Logs view can't scoll in Firefox
 Key: FLINK-21268
 URL: https://issues.apache.org/jira/browse/FLINK-21268
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Web Frontend
Affects Versions: 1.12.1
 Environment: macOS 10.15.7

Firefox 85(latest) and Firefox Developer Edition

Flink 1.10 (from which I begin use Flink ) to Flink 1.12 current version
Reporter: Frost Wong


Logs view in both TaskManager and JobManager can't scoll in Firefox



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21267) FLIP-156: Runtime Interfaces for Fine-Grained Resource Requirements

2021-02-03 Thread Yangze Guo (Jira)
Yangze Guo created FLINK-21267:
--

 Summary: FLIP-156: Runtime Interfaces for Fine-Grained Resource 
Requirements
 Key: FLINK-21267
 URL: https://issues.apache.org/jira/browse/FLINK-21267
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Coordination
Reporter: Yangze Guo
 Fix For: 1.13.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21266) The POST request caused a JSON serialization error

2021-02-03 Thread chenkaiyin (Jira)
chenkaiyin created FLINK-21266:
--

 Summary: The POST request caused a JSON serialization error
 Key: FLINK-21266
 URL: https://issues.apache.org/jira/browse/FLINK-21266
 Project: Flink
  Issue Type: Bug
  Components: Runtime / REST
Affects Versions: 1.11.3
Reporter: chenkaiyin


The annotation in the document can request the interface directly, but it will 
return an error directly。

Post /jobs/:job/savepoints
Post /jobs/:job/stop

An error is reported when serializing "{}" in code:

MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass());

(AbstractHandle.java: 150)

The reason is no configuration @Nullable SavepointTriggerRequestBody.class file 
cancelJob field

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

2021-02-03 Thread Thomas Weise
Thanks for initiating this discussion and creating the proposal!

I would like to contribute to this effort. Has there been related activity
since the FLIP was created?

If not, I would like to start work on a PoC to validate the design.

Questions/comments:

There could be more use cases for a hybrid source beyond predefined
sequence that is fixed at job submission time. For example, the source
connector could be used to migrate from one external system to another
(like Kafka1 .. KafkaN - based on external trigger/discovery).

I agree with @Aljoscha Krettek  that it would be
preferable to solve this without special "switchable" interfaces and have
it work with any FLIP-27 source as is. Performing the switch using the
enumerator checkpoint appears viable (not proven though unless coded 😉).
The actual FLIP-27 source reader would need to signal to the
"HybridSourceReader" (HSR) that they are done and then the HSR would send
the switch event to the coordinator?

To further confirm my understanding:

The actual split type that flows between enumerator and reader would be
"HybridSourceSplit" and it would wrap the specific split (in the example
either HDFS or Kafka)?

Switching relies on the previous source's end position to be communicated
as start position to the next source. The position(s) can be exchanged
through the checkpoint state, but "HybridSplitEnumerator" still needs a way
to extract them from the actual enumerator. That could be done by the
enumerator checkpoint state mapping function looking at the current split
assignments, which would not require modification of existing enumerators?

Cheers,
Thomas


On Fri, Jan 8, 2021 at 4:07 AM Aljoscha Krettek  wrote:

> Hi Nicholas,
>
> Thanks for starting the discussion!
>
> I think we might be able to simplify this a bit and re-use existing
> functionality.
>
> There is already `Source.restoreEnumerator()` and
> `SplitEnumerator.snapshotState(). This seems to be roughly what the
> Hybrid Source needs. When the initial source finishes, we can take a
> snapshot (which should include data that the follow-up sources need for
> initialization). Then we need a function that maps the enumerator
> checkpoint types between initial source and new source and we are good
> to go. We wouldn't need to introduce any additional interfaces for
> sources to implement, which would fragment the ecosystem between sources
> that can be used in a Hybrid Source and sources that cannot be used in a
> Hybrid Source.
>
> What do you think?
>
> Best,
> Aljoscha
>
> On 2020/11/03 02:34, Nicholas Jiang wrote:
> >Hi devs,
> >
> >I'd like to start a new FLIP to introduce the Hybrid Source. The hybrid
> >source is a source that contains a list of concrete sources. The hybrid
> >source reads from each contained source in the defined order. It switches
> >from source A to the next source B when source A finishes.
> >
> >In practice, many Flink jobs need to read data from multiple sources in
> >sequential order. Change Data Capture (CDC) and machine learning feature
> >backfill are two concrete scenarios of this consumption pattern. Users may
> >have to either run two different Flink jobs or have some hacks in the
> >SourceFunction to address such use cases.
> >
> >To support above scenarios smoothly, the Flink jobs need to first read
> from
> >HDFS for historical data then switch to Kafka for real-time records. The
> >hybrid source has several benefits from the user's perspective:
> >
> >- Switching among multiple sources is easy based on the switchable source
> >implementations of different connectors.
> >- This supports to automatically switching for user-defined switchable
> >source that constitutes hybrid source.
> >- There is complete and effective mechanism to support smooth source
> >migration between historical and real-time data.
> >
> >Therefore, in this discussion, we propose to introduce a “Hybrid Source”
> API
> >built on top of the new Source API (FLIP-27) to help users to smoothly
> >switch sources. For more detail, please refer to the FLIP design doc[1].
> >
> >I'm looking forward to your feedback.
> >
> >[1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
> ><
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
> >
> >
> >Best,
> >Nicholas Jiang
> >
> >
> >
> >--
> >Sent from:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>


Re: [DISCUSS]FLIP-163: SQL Client Improvements

2021-02-03 Thread Rui Li
Hi guys,

Regarding #3 and #4, I agree SHOW JARS is more consistent with other
commands than LIST JARS. I don't have a strong opinion about REMOVE vs
DELETE though.

While flink doesn't need to follow hive syntax, as far as I know, most
users who are requesting these features are previously hive users. So I
wonder whether we can support both LIST/SHOW JARS and REMOVE/DELETE JARS
as synonyms? It's just like lots of systems accept both EXIT and QUIT as
the command to terminate the program. So if that's not hard to achieve, and
will make users happier, I don't see a reason why we must choose one over
the other.

On Wed, Feb 3, 2021 at 10:33 PM Timo Walther  wrote:

> Hi everyone,
>
> some feedback regarding the open questions. Maybe we can discuss the
> `TableEnvironment.executeMultiSql` story offline to determine how we
> proceed with this in the near future.
>
> 1) "whether the table environment has the ability to update itself"
>
> Maybe there was some misunderstanding. I don't think that we should
> support `tEnv.getConfig.getConfiguration.setString("table.planner",
> "old")`. Instead I'm proposing to support
> `TableEnvironment.create(Configuration)` where planner and execution
> mode are read immediately and a subsequent changes to these options will
> have no effect. We are doing it similar in `new
> StreamExecutionEnvironment(Configuration)`. These two ConfigOption's
> must not be SQL Client specific but can be part of the core table code
> base. Many users would like to get a 100% preconfigured environment from
> just Configuration. And this is not possible right now. We can solve
> both use cases in one change.
>
> 2) "the sql client, we will maintain two parsers"
>
> I remember we had some discussion about this and decided that we would
> like to maintain only one parser. In the end it is "One Flink SQL" where
> commands influence each other also with respect to keywords. It should
> be fine to include the SQL Client commands in the Flink parser. Of
> cource the table environment would not be able to handle the `Operation`
> instance that would be the result but we can introduce hooks to handle
> those `Operation`s. Or we introduce parser extensions.
>
> Can we skip `table.job.async` in the first version? We should further
> discuss whether we introduce a special SQL clause for wrapping async
> behavior or if we use a config option? Esp. for streaming queries we
> need to be careful and should force users to either "one INSERT INTO" or
> "one STATEMENT SET".
>
> 3) 4) "HIVE also uses these commands"
>
> In general, Hive is not a good reference. Aligning the commands more
> with the remaining commands should be our goal. We just had a MODULE
> discussion where we selected SHOW instead of LIST. But it is true that
> JARs are not part of the catalog which is why I would not use
> CREATE/DROP. ADD/REMOVE are commonly siblings in the English language.
> Take a look at the Java collection API as another example.
>
> 6) "Most of the commands should belong to the table environment"
>
> Thanks for updating the FLIP this makes things easier to understand. It
> is good to see that most commends will be available in TableEnvironment.
> However, I would also support SET and RESET for consistency. Again, from
> an architectural point of view, if we would allow some kind of
> `Operation` hook in table environment, we could check for SQL Client
> specific options and forward to regular `TableConfig.getConfiguration`
> otherwise. What do you think?
>
> Regards,
> Timo
>
>
> On 03.02.21 08:58, Jark Wu wrote:
> > Hi Timo,
> >
> > I will respond some of the questions:
> >
> > 1) SQL client specific options
> >
> > Whether it starts with "table" or "sql-client" depends on where the
> > configuration takes effect.
> > If it is a table configuration, we should make clear what's the behavior
> > when users change
> > the configuration in the lifecycle of TableEnvironment.
> >
> > I agree with Shengkai `sql-client.planner` and
> `sql-client.execution.mode`
> > are something special
> > that can't be changed after TableEnvironment has been initialized. You
> can
> > see
> > `StreamExecutionEnvironment` provides `configure()`  method to override
> > configuration after
> > StreamExecutionEnvironment has been initialized.
> >
> > Therefore, I think it would be better to still use  `sql-client.planner`
> > and `sql-client.execution.mode`.
> >
> > 2) Execution file
> >
> >>From my point of view, there is a big difference between
> > `sql-client.job.detach` and
> > `TableEnvironment.executeMultiSql()` that `sql-client.job.detach` will
> > affect every single DML statement
> > in the terminal, not only the statements in SQL files. I think the single
> > DML statement in the interactive
> > terminal is something like tEnv#executeSql() instead of
> > tEnv#executeMultiSql.
> > So I don't like the "multi" and "sql" keyword in `table.multi-sql-async`.
> > I just find that runtime provides a configuration called
> > "execution.attached

Re: PyFlink DataStream API

2021-02-03 Thread Shuiqiang Chen
Hi Partha,

PyFlink DataStream API is a newly introduced feature in Flink 1.12.
Currently, some basic operations (map/flat_map/filter/key_by/process, etc),
timer and state access operations are supported. We are also continuously
improving and enriching these APIs. Looking forward to your feedback!

Best,
Shuiqiang

Partha Mishra  于2021年2月3日周三 下午8:08写道:

> Hi,
>
> Can anyone suggest if PyFlink Data Stream APIs (available from Flink 1.12)
> are being properly tested and used in any production environment/use-case
> without any issue and gives expected performance?
>
>


[jira] [Created] (FLINK-21265) SQLClientSchemaRegistryITCase.testReading failed with DockerClientException unauthorized

2021-02-03 Thread Jark Wu (Jira)
Jark Wu created FLINK-21265:
---

 Summary: SQLClientSchemaRegistryITCase.testReading failed with 
DockerClientException unauthorized
 Key: FLINK-21265
 URL: https://issues.apache.org/jira/browse/FLINK-21265
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.13.0
Reporter: Jark Wu


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=12865&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=ff888d9b-cd34-53cc-d90f-3e446d355529


{code}
Feb 03 17:50:00 [INFO] ---
Feb 03 17:50:00 [INFO]  T E S T S
Feb 03 17:50:00 [INFO] ---
Feb 03 17:50:56 [INFO] Running 
org.apache.flink.tests.util.kafka.SQLClientSchemaRegistryITCase
Feb 03 17:52:41 [ERROR] Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time 
elapsed: 105.31 s <<< FAILURE! - in 
org.apache.flink.tests.util.kafka.SQLClientSchemaRegistryITCase
Feb 03 17:52:41 [ERROR] 
testReading(org.apache.flink.tests.util.kafka.SQLClientSchemaRegistryITCase)  
Time elapsed: 2.278 s  <<< ERROR!
Feb 03 17:52:41 java.util.concurrent.ExecutionException: 
org.testcontainers.containers.ContainerLaunchException: Container startup failed
Feb 03 17:52:41 at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
Feb 03 17:52:41 at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
Feb 03 17:52:41 at 
org.testcontainers.containers.GenericContainer.start(GenericContainer.java:307)
Feb 03 17:52:41 at 
org.testcontainers.containers.GenericContainer.starting(GenericContainer.java:1021)
Feb 03 17:52:41 at 
org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29)
Feb 03 17:52:41 at org.junit.rules.RunRules.evaluate(RunRules.java:20)
Feb 03 17:52:41 at 
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
Feb 03 17:52:41 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
Feb 03 17:52:41 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
Feb 03 17:52:41 at 
org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
Feb 03 17:52:41 at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
Feb 03 17:52:41 at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
Feb 03 17:52:41 at 
org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
Feb 03 17:52:41 at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
Feb 03 17:52:41 at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
Feb 03 17:52:41 at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
Feb 03 17:52:41 at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)
Feb 03 17:52:41 at java.lang.Thread.run(Thread.java:748)
Feb 03 17:52:41 Caused by: 
org.testcontainers.containers.ContainerLaunchException: Container startup failed
Feb 03 17:52:41 at 
org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:327)
Feb 03 17:52:41 at 
org.testcontainers.containers.KafkaContainer.doStart(KafkaContainer.java:94)
Feb 03 17:52:41 at 
org.testcontainers.containers.GenericContainer.start(GenericContainer.java:308)
Feb 03 17:52:41 at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
Feb 03 17:52:41 at 
java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:701)
Feb 03 17:52:41 at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
Feb 03 17:52:41 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
Feb 03 17:52:41 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Feb 03 17:52:41 ... 1 more
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21264) at yarn-per-job mode, tm stopped ,the jm dont relase resource,its also running

2021-02-03 Thread tonychan (Jira)
tonychan created FLINK-21264:


 Summary: at yarn-per-job mode, tm stopped ,the jm dont relase 
resource,its also running
 Key: FLINK-21264
 URL: https://issues.apache.org/jira/browse/FLINK-21264
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.12.0
Reporter: tonychan
 Attachments: image-2021-02-03-17-42-53-269.png, 
image-2021-02-03-17-43-28-017.png

i make a socket (nc -lk ) for flink test,  and at first the flink running 
is ok,when the source server stoppe !image-2021-02-03-17-42-53-269.png!

 

!image-2021-02-03-17-43-28-017.png!

 

 

 

d,  the flink tm is stopped and the status is finish ,but the jm is also 
running,



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[DISCUSS] End to End latency measurement

2021-02-03 Thread Sherin Thomas
Hello Folks,

We have some pipelines that contain multiple hops of Flink jobs with Kafka
transport layer in between. Data is finally written to analytical stores.
We want to measure e-2-e from the first source all the way to the last
sink(that writes to the analytical stores) and possibly also at other hops
in the middle.

Here are some strategies I'm thinking about, would love your thoughts on
the various approaches:
1. ** at various hops. Each data will contain
event_time based on the when it is written to the first kafka source ->
When there are windowed aggregations and such it's tricky to translate
correct event time to the derived event. So this is tightly coupled with
user logic and hence not favorable.
2. *Latency markers introduced in the Kafka stream *that will be consumed
by the Flink jobs -> We can potentially introduce latency markers along
with regular data, this will share the same data envelope schema so it can
travel with the regular data. Operators will need to identify it and
forward it appropriately and also exclude it from aggregations and such
which makes this approach complex. Unless there is an elegant way to
piggyback on the internal Flink latency marker movement for e-2-e latency
tracking? *Would love to hear your thoughts about this.*
3. *Sum of Kafka consumer lag* across all the Kafka topics in the pipeline
- Will give us tail latencies. We would ideally love to get a histogram of
latencies across all the events.
4. *Global minimum watermark *- In this approach, I'm thinking about
periodically checking the global minimum watermark and using that to
determine tail latency - this would probably not give a good histogram of
latencies across all partitions and data. But so far this approach seems
like the easiest to generalize across all types of operators and
aggregations. But would love to hear your thoughts on the feasibility of
this.


Let me know what you think. And if there are better ways to measure
end-2-end latency that would be lovely!


Thanks,
Sherin


Re: Is development in FlinkML still active?

2021-02-03 Thread Badrul Chowdhury
Hi Becket,

I was looking into picking an item to work on and I came across this open
PR for the BinarizerMapper:

https://issues.apache.org/jira/browse/FLINK-13669
https://github.com/apache/flink/pull/9406

Could you please help me understand what is required for this to be merged?
Can I help?

Thanks,
Badrul

On Thu, Jan 7, 2021 at 9:05 AM Becket Qin  wrote:

> Thanks Badrul,
>
> Contribution to the ML pipeline is highly appreciated! Please don't
> hesitate to reach out if you got any questions.
>
> Cheers,
>
> Jiangjie (Becket) Qin
>
> On Fri, Jan 8, 2021 at 1:02 AM Badrul Chowdhury <
> badrulchowdhur...@gmail.com>
> wrote:
>
> > Thanks all for the pointers! The architecture outlined in FLIP39
> > <
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-39+Flink+ML+pipeline+and+ML+libs
> > >
> > seems
> > promising and the community seems to have accepted it- let me look into
> > contributing to that effort.
> >
> > Thanks,
> > Badrul
> >
> > On Thu, Jan 7, 2021 at 4:29 AM Becket Qin  wrote:
> >
> > > Hi Flavio,
> > >
> > > Currently most of the Flink AI efforts are in ecosystem projects.
> > Primarily
> > > flink-ai-extended[1] and Alink[2] as you noticed.
> > >
> > > - flink-ai-extended aims to provide a solution to unify big data and AI
> > > (especially deep learning) based on Flink.
> > > - Alink is a classic machine learning library.
> > >
> > > We have seen some unique values of these projects in scenarios such as
> > > realtime machine learning, as well as unifying the online/offline
> > systems.
> > > That said, because Flink AI is still kind of new and we are still
> > exploring
> > > many things. We would like to wait a little bit until the APIs are more
> > > stable before putting them into Apache Flink.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > [1] https://github.com/alibaba/flink-ai-extended
> > > [2] https://github.com/alibaba/Alink
> > >
> > >
> > > On Thu, Jan 7, 2021 at 7:53 PM Flavio Pompermaier <
> pomperma...@okkam.it>
> > > wrote:
> > >
> > > > Or also https://github.com/alibaba/Alink, I don't know if the 2 are
> > > > related
> > > > somehow..
> > > >
> > > > On Thu, Jan 7, 2021 at 9:55 AM Flavio Pompermaier <
> > pomperma...@okkam.it>
> > > > wrote:
> > > >
> > > > > What about Flink-AI [1]? Would you suggest its adoption Till?
> > > > >
> > > > > [1] https://github.com/alibaba/flink-ai-extended
> > > > >
> > > > > On Thu, Jan 7, 2021 at 9:38 AM Till Rohrmann  >
> > > > wrote:
> > > > >
> > > > >> HI Badrul,
> > > > >>
> > > > >> FlinkML is unfortunately no longer under active development.
> > However,
> > > > >> there
> > > > >> is some new effort to add a machine learning library to Flink [1].
> > > > >>
> > > > >> [1]
> > > > >>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-39+Flink+ML+pipeline+and+ML+libs
> > > > >>
> > > > >> Cheers,
> > > > >> Till
> > > > >>
> > > > >> On Wed, Jan 6, 2021 at 7:11 PM Badrul Chowdhury <
> > > > >> badrulchowdhur...@gmail.com>
> > > > >> wrote:
> > > > >>
> > > > >> > Hi,
> > > > >> >
> > > > >> > I see that the last update to the roadmap for FlinkML was some
> > time
> > > > ago
> > > > >> > (2016). Is development still active? If so, I would like to
> > > contribute
> > > > >> some
> > > > >> > unsupervised clustering algorithms like CLARANS. Would love some
> > > > >> pointers.
> > > > >> >
> > > > >> > Thanks,
> > > > >> > Badrul
> > > > >> >
> > > > >
> > > > >
> > > >
> > >
> >
> >
> > --
> >
> > Cheers,
> > Badrul
> >
>


-- 

Cheers,
Badrul


[jira] [Created] (FLINK-21263) Job hangs under backpressure

2021-02-03 Thread Lu Niu (Jira)
Lu Niu created FLINK-21263:
--

 Summary: Job hangs under backpressure
 Key: FLINK-21263
 URL: https://issues.apache.org/jira/browse/FLINK-21263
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.11.0
Reporter: Lu Niu
 Attachments: source_graph.svg, source_js1, source_js2, source_js3

We have a flink job that runs fine for a few days but suddenly hangs and could 
never recover. Once we relanuch the job, the job runs fine. We detected the job 
has backpressure, but in all other cases, backpressure would only lead to 
slower consumption but what is wired here is the job made no progress at all. 
The symptoms looks similar with FLINK-20618

 

About the job:
1. Reads from Kafka and writes to Kafka

2. version 1.11

3. enabled unaligned checkpoint

 

symptoms:
 # All source/sink throughput drop to 0
 # All checkpoint fails immediately after triggering.
 # backpressure shows "high" from source to two downstream operators. 
 # Flamegraph shows all subtask threads are in waiting
 # Source jstack shows the Source thread is BLOCKED, as belows.

{code:java}
Source: impression-reader -> impression-filter -> impression-data-conversion 
(1/60)
Stack Trace is:
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0003a3e71330> (a 
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:293)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)
at 
org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:213)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:294)
at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.getBufferBuilder(ChannelSelectorRecordWriter.java:95)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:135)
at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:80)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
at 
org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
at 
org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
at 
org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndEmitWatermark(StreamSourceContexts.java:315)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.emitWatermark(StreamSourceContexts.java:425)
- locked <0x0006a485dab0> (a java.lang.Object)
at 
org.apache.flink.streaming.connectors.kafka.internals.SourceContextWatermarkOutputAdapter.emitWatermark(SourceContextWatermarkOutputAdapter.java:37)
at 
org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer.updateCombinedWatermark(WatermarkOutputMultiplexer.java:167)
at 
org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer.onPeriodicEmit(WatermarkOutputMultiplexer.java:136)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$PeriodicWatermarkEmitter.onProcessingTime(AbstractFetcher.java:574)
- locked <0x0006a485dab0> (a java.lang.Object)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1220)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$16(StreamTask.java:1211)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$590/1066788035.run(Unknown
 Source)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStream

[jira] [Created] (FLINK-21262) Remove SlotPool.suspend

2021-02-03 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-21262:
-

 Summary: Remove SlotPool.suspend
 Key: FLINK-21262
 URL: https://issues.apache.org/jira/browse/FLINK-21262
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.13.0


Since the completion of FLINK-11719 we no longer need to suspend the 
{{SlotPool}}. Hence, I suggest to remove this method.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLINK-21045: Support 'load module' and 'unload module' SQL syntax

2021-02-03 Thread Jark Wu
Thanks for the summary. LGTM.

On Wed, 3 Feb 2021 at 20:25, Rui Li  wrote:

> Thanks Jane for the summary. Looks good to me.
>
> On Wed, Feb 3, 2021 at 7:53 PM Jane Chan  wrote:
>
> > Hi @Jark, @Timo, I've updated the comments, and please have a look when
> > you're free.
> >
> > Best,
> > Jane
> >
> > On Wed, Feb 3, 2021 at 7:14 PM Jane Chan  wrote:
> >
> > >
> > > Reply @Timo
> > >
> > >> Remove the `used` column for SHOW MODULES. It will always show true.
> > >>
> > > Good catch. It's a copy-paste typo, and I forgot to remove that column.
> > >
> > > How about creating a POJO (static inner class of ModuleManager) called
> > >> `ModuleEntry` or similar.
> > >>
> > > +1 for better encapsulation.
> > >
> > > Reply @Jark
> > >
> > >> A minor comment on `useModules(List names)`, would be better
> to
> > >> use varargs here to a more fluent API: `useModules("a", "b", "c")`.
> > >>
> > >  +1, and that's better.
> > >
> > > Do we also need to add these new methods (useModules, listFullModules)
> > >> to TableEnvironment?
> > >>
> > > Yes, indeed.
> > >
> > > Thank you all for polishing this proposal to make it more thorough.
> > >
> > > Best,
> > > Jane
> > >
> > > On Wed, Feb 3, 2021 at 6:41 PM Jark Wu  wrote:
> > >
> > >> A minor comment on `useModules(List names)`,
> > >> would be better to use varargs here to a more fluent API:
> > `useModules("a",
> > >> "b", "c")`.
> > >>
> > >> Besides, do we also need to add these new methods (useModules,
> > >> listFullModules) to
> > >> TableEnvironment?
> > >>
> > >> Best,
> > >> Jark
> > >>
> > >> On Wed, 3 Feb 2021 at 18:36, Timo Walther  wrote:
> > >>
> > >> > Thanks for the nice summary Jane. The summary looks great. Some
> minor
> > >> > feedback:
> > >> >
> > >> > - Remove the `used` column for SHOW MODULES. It will always show
> true.
> > >> >
> > >> > - `List> listFullModules()` is a very long
> > >> > signature. And `Pair` should be avoided in code because it is not
> very
> > >> > descriptive. How about creating a POJO (static inner class of
> > >> > ModuleManager) called `ModuleEntry` or similar.
> > >> >
> > >> > Otherwise +1 for the proposal.
> > >> >
> > >> > Regards,
> > >> > Timo
> > >> >
> > >> > On 03.02.21 11:24, Jane Chan wrote:
> > >> > > Hi everyone,
> > >> > >
> > >> > > I did a summary on the Jira issue page [1] since the discussion
> has
> > >> > > achieved a consensus. If there is anything missed or not
> corrected,
> > >> > please
> > >> > > let me know.
> > >> > >
> > >> > > [1] https://issues.apache.org/jira/browse/FLINK-21045#
> > >> > >
> > >> > > Best,
> > >> > > Jane
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > > On Wed, Feb 3, 2021 at 1:33 PM Jark Wu  wrote:
> > >> > >
> > >> > >> Hi Jane,
> > >> > >>
> > >> > >> Yes. I think we should fail fast.
> > >> > >>
> > >> > >> Best,
> > >> > >> Jark
> > >> > >>
> > >> > >> On Wed, 3 Feb 2021 at 12:06, Jane Chan 
> > >> wrote:
> > >> > >>
> > >> > >>> Hi everyone,
> > >> > >>>
> > >> > >>> Thanks for the discussion to make this improvement plan clearer.
> > >> > >>>
> > >> > >>> Hi, @Jark, @Rui, and @Timo, I'm collecting the final discussion
> > >> > summaries
> > >> > >>> now and want to confirm one thing that for the statement `USE
> > >> MODULES x
> > >> > >> [,
> > >> > >>> y, z, ...]`, if the module name list contains an unexsited
> module,
> > >> > shall
> > >> > >> we
> > >> > >>> #1 fail the execution for all of them or #2 enabled the rest
> > modules
> > >> > and
> > >> > >>> return a warning to users? My personal preference goes to #1 for
> > >> > >>> simplicity. What do you think?
> > >> > >>>
> > >> > >>> Best,
> > >> > >>> Jane
> > >> > >>>
> > >> > >>> On Tue, Feb 2, 2021 at 3:53 PM Timo Walther  >
> > >> > wrote:
> > >> > >>>
> > >> >  +1
> > >> > 
> > >> >  @Jane Can you summarize our discussion in the JIRA issue?
> > >> > 
> > >> >  Thanks,
> > >> >  Timo
> > >> > 
> > >> > 
> > >> >  On 02.02.21 03:50, Jark Wu wrote:
> > >> > > Hi Timo,
> > >> > >
> > >> > >> Another question is whether a LOAD operation also adds the
> > >> module to
> > >> > >>> the
> > >> > > enabled list by default?
> > >> > >
> > >> > > I would like to add the module to the enabled list by default,
> > the
> > >> > >> main
> > >> > > reasons are:
> > >> > > 1) Reordering is an advanced requirement, adding modules needs
> > >> > >>> additional
> > >> > > USE statements with "core" module
> > >> > >sounds too burdensome. Most users should be satisfied with
> > only
> > >> > >> LOAD
> > >> > > statements.
> > >> > > 2) We should keep compatible for
> TableEnvironment#loadModule().
> > >> > > 3) We are using the LOAD statement instead of CREATE, so I
> think
> > >> it's
> > >> >  fine
> > >> > > that it does some implicit things.
> > >> > >
> > >> > > Best,
> > >> > > Jark
> > >> > >
> > >> > > On Tue, 2 Feb 2021 at 00:48, Timo Walther  >
> > >> > >> wrote:
> > >> > 

Re: [DISCUSS]FLIP-163: SQL Client Improvements

2021-02-03 Thread Timo Walther

Hi everyone,

some feedback regarding the open questions. Maybe we can discuss the 
`TableEnvironment.executeMultiSql` story offline to determine how we 
proceed with this in the near future.


1) "whether the table environment has the ability to update itself"

Maybe there was some misunderstanding. I don't think that we should 
support `tEnv.getConfig.getConfiguration.setString("table.planner", 
"old")`. Instead I'm proposing to support 
`TableEnvironment.create(Configuration)` where planner and execution 
mode are read immediately and a subsequent changes to these options will 
have no effect. We are doing it similar in `new 
StreamExecutionEnvironment(Configuration)`. These two ConfigOption's 
must not be SQL Client specific but can be part of the core table code 
base. Many users would like to get a 100% preconfigured environment from 
just Configuration. And this is not possible right now. We can solve 
both use cases in one change.


2) "the sql client, we will maintain two parsers"

I remember we had some discussion about this and decided that we would 
like to maintain only one parser. In the end it is "One Flink SQL" where 
commands influence each other also with respect to keywords. It should 
be fine to include the SQL Client commands in the Flink parser. Of 
cource the table environment would not be able to handle the `Operation` 
instance that would be the result but we can introduce hooks to handle 
those `Operation`s. Or we introduce parser extensions.


Can we skip `table.job.async` in the first version? We should further 
discuss whether we introduce a special SQL clause for wrapping async 
behavior or if we use a config option? Esp. for streaming queries we 
need to be careful and should force users to either "one INSERT INTO" or 
"one STATEMENT SET".


3) 4) "HIVE also uses these commands"

In general, Hive is not a good reference. Aligning the commands more 
with the remaining commands should be our goal. We just had a MODULE 
discussion where we selected SHOW instead of LIST. But it is true that 
JARs are not part of the catalog which is why I would not use 
CREATE/DROP. ADD/REMOVE are commonly siblings in the English language. 
Take a look at the Java collection API as another example.


6) "Most of the commands should belong to the table environment"

Thanks for updating the FLIP this makes things easier to understand. It 
is good to see that most commends will be available in TableEnvironment. 
However, I would also support SET and RESET for consistency. Again, from 
an architectural point of view, if we would allow some kind of 
`Operation` hook in table environment, we could check for SQL Client 
specific options and forward to regular `TableConfig.getConfiguration` 
otherwise. What do you think?


Regards,
Timo


On 03.02.21 08:58, Jark Wu wrote:

Hi Timo,

I will respond some of the questions:

1) SQL client specific options

Whether it starts with "table" or "sql-client" depends on where the
configuration takes effect.
If it is a table configuration, we should make clear what's the behavior
when users change
the configuration in the lifecycle of TableEnvironment.

I agree with Shengkai `sql-client.planner` and `sql-client.execution.mode`
are something special
that can't be changed after TableEnvironment has been initialized. You can
see
`StreamExecutionEnvironment` provides `configure()`  method to override
configuration after
StreamExecutionEnvironment has been initialized.

Therefore, I think it would be better to still use  `sql-client.planner`
and `sql-client.execution.mode`.

2) Execution file


From my point of view, there is a big difference between

`sql-client.job.detach` and
`TableEnvironment.executeMultiSql()` that `sql-client.job.detach` will
affect every single DML statement
in the terminal, not only the statements in SQL files. I think the single
DML statement in the interactive
terminal is something like tEnv#executeSql() instead of
tEnv#executeMultiSql.
So I don't like the "multi" and "sql" keyword in `table.multi-sql-async`.
I just find that runtime provides a configuration called
"execution.attached" [1] which is false by default
which specifies if the pipeline is submitted in attached or detached mode.
It provides exactly the same
functionality of `sql-client.job.detach`. What do you think about using
this option?

If we also want to support this config in TableEnvironment, I think it
should also affect the DML execution
  of `tEnv#executeSql()`, not only DMLs in `tEnv#executeMultiSql()`.
Therefore, the behavior may look like this:

val tableResult = tEnv.executeSql("INSERT INTO ...")  ==> async by default
tableResult.await()   ==> manually block until finish
tEnv.getConfig().getConfiguration().setString("execution.attached", "true")
val tableResult2 = tEnv.executeSql("INSERT INTO ...")  ==> sync, don't need
to wait on the TableResult
tEnv.executeMultiSql(
"""
CREATE TABLE   ==> always sync
INSERT INTO ...  => sync, because we set configuration above
SE

[RESULT][VOTE] FLIP-159: Reactive Mode

2021-02-03 Thread Robert Metzger
Thank you all for reviewing and voting on the FLIP!

It has been accepted!

On Wed, Feb 3, 2021 at 4:58 AM Yu Li  wrote:

> +1 (binding)
>
> Thanks for the efforts Robert!
>
> Best Regards,
> Yu
>
>
> On Mon, 1 Feb 2021 at 17:19, Matthias Pohl  wrote:
>
> > Thanks Robert and congratulations on your first FLIP.
> > +1 (non-binding)
> >
> > Matthias
> >
> > On Mon, Feb 1, 2021 at 4:22 AM Zhu Zhu  wrote:
> >
> > > +1 (binding)
> > >
> > > Thanks,
> > > Zhu
> > >
> > > Till Rohrmann  于2021年1月29日周五 下午10:23写道:
> > >
> > > > LGTM. Thanks for the work Robert!
> > > >
> > > > +1 (binding)
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Thu, Jan 28, 2021 at 11:27 AM Yang Wang 
> > > wrote:
> > > >
> > > > > Thanks Robert for your great work on this FLIP. This is really a
> big
> > > step
> > > > > to make Flink auto scalable.
> > > > >
> > > > > +1 (non-binding)
> > > > >
> > > > >
> > > > > Best,
> > > > > Yang
> > > > >
> > > > > Robert Metzger  于2021年1月28日周四 下午4:32写道:
> > > > >
> > > > > > @Yangze: That's something I overlooked. I should have waited. If
> > > > FLIP-160
> > > > > > is rejected or undergoes fundamental changes, I'll cancel this
> vote
> > > and
> > > > > > rewrite FLIP-159.
> > > > > > But I have the impression that there were no major concerns
> > regarding
> > > > > > FLIP-160 so far.
> > > > > >
> > > > > > On Thu, Jan 28, 2021 at 8:46 AM Yangze Guo 
> > > wrote:
> > > > > >
> > > > > > > Thanks for driving this, Robert! LGTM.
> > > > > > >
> > > > > > > +1
> > > > > > >
> > > > > > > minor: Just a little confused about the program. It seems this
> > > > > > > proposal relies on the FLIP-160, which is still under
> discussion.
> > > > > > > Should we always vote for the prerequisite first?
> > > > > > >
> > > > > > > Best,
> > > > > > > Yangze Guo
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Jan 28, 2021 at 3:27 PM Xintong Song <
> > > tonysong...@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > Thanks Robert. LGTM.
> > > > > > > >
> > > > > > > > +1 (binding)
> > > > > > > >
> > > > > > > > Thank you~
> > > > > > > >
> > > > > > > > Xintong Song
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, Jan 28, 2021 at 2:50 PM Robert Metzger <
> > > > rmetz...@apache.org>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi all,
> > > > > > > > >
> > > > > > > > > since the discussion [1] about FLIP-159 [2] seems to have
> > > > reached a
> > > > > > > > > consensus, I'd like to start a formal vote for the FLIP.
> > > > > > > > >
> > > > > > > > > Please vote +1 to approve the FLIP, or -1 with a comment.
> The
> > > > vote
> > > > > > > will be
> > > > > > > > > open at least until Tuesday, Feb 2nd.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Robert
> > > > > > > > >
> > > > > > > > > [1]
> > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/ra688faf9dca036500f0445c55671e70ba96c70f942afe650e9db8374%40%3Cdev.flink.apache.org%3E
> > > > > > > > > [2]
> > > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode
> > > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
>


Re: [DISCUSS] FLIP-162: Consistent Flink SQL time function behavior

2021-02-03 Thread Stephan Ewen
Hi all!

A quick thought on this thread: We see a typical stalemate here, as in so
many discussions recently.
One developer prefers it this way, another one another way. Both have
pro/con arguments, it takes a lot of time from everyone, still there is
little progress in the discussion.

Ultimately, this can only be decided by talking to the users. And it
would also be the best way to ensure that what we build is the intuitive
and expected way for users.
The less the users are into the deep aspects of Flink SQL, the better they
can mirror what a common user would expect (a power user will anyways
figure it out).
Let's find a person to drive that, spell it out in the FLIP as "semantics
TBD", and focus on the implementation of the parts that are agreed upon.

For interviewing the users, here are some ideas for questions to look at:
  - How do they view the trade-off between stable semantics vs.
out-of-the-box magic (faster getting started).
  - How comfortable are they realizing the different meaning of "now()" in
a streaming versus batch context.
  - What would be their expectation when moving a query with the time
functions ("now()") from an unbounded stream (Kafka source without end
offset) to a bounded stream (Kafka source with end offsets), which may
switch execution to batch.

Best,
Stephan


On Tue, Feb 2, 2021 at 3:19 PM Jark Wu  wrote:

> Hi Fabian,
>
> I think we have an agreement that the functions should be evaluated at
> query start in batch mode.
> Because all the other batch systems and traditional databases are this
> behavior, which is standard SQL compliant.
>
> *1. The different point of view is what's the behavior in streaming mode? *
>
> From my point of view, I don't see any potential meaning to evaluate at
> query-start for a 365-day long running streaming job.
> And from my observation, CURRENT_TIMESTAMP is heavily used by Flink
> streaming users and they expect the current behaviors.
> The SQL standard only provides a guideline for traditional batch systems,
> however Flink is a leading streaming processing system
> which is out of the scope of SQL standard, and Flink should define the
> streaming standard. I think a standard should follow users' intuition.
> Therefore, I think we don't need to be standard SQL compliant at this point
> because users don't expect it.
> Changing the behavior of the functions to evaluate at query start for
> streaming mode will hurt most of Flink SQL users and we have nothing to
> gain,
> we should avoid this.
>
> *2. Does it break the unified streaming-batch semantics? *
>
> I don't think so. First of all, what's the unified streaming-batch
> semantic?
> I think it means the* eventual result* instead of the *behavior*.
> It's hard to say we have provided unified behavior for streaming and batch
> jobs,
> because for example unbounded aggregate behaves very differently.
> In batch mode, it only evaluates once for the bounded data and emits the
> aggregate result once.
>  But in streaming mode, it evaluates for each row and emits the updated
> result.
> What we have always emphasized "unified streaming-batch semantics" is [1]
>
> > a query produces exactly the same result regardless whether its input is
> static batch data or streaming data.
>
> From my understanding, the "semantic" means the "eventual result".
> And time functions are non-deterministic, so it's reasonable to get
> different results for batch and streaming mode.
> Therefore, I think it doesn't break the unified streaming-batch semantics
> to evaluate per-record for streaming and
> query-start for batch, as the semantic doesn't means behavior semantic.
>
> Best,
> Jark
>
> [1]: https://flink.apache.org/news/2017/04/04/dynamic-tables.html
>
> On Tue, 2 Feb 2021 at 18:34, Fabian Hueske  wrote:
>
> > Hi everyone,
> >
> > Sorry for joining this discussion late.
> > Let me give some thought to two of the arguments raised in this thread.
> >
> > Time functions are inherently non-determintistic:
> > --
> > This is of course true, but IMO it doesn't mean that the semantics of
> time
> > functions do not matter.
> > It makes a difference whether a function is evaluated once and it's
> result
> > is reused or whether it is invoked for every record.
> > Would you use the same logic to justify different behavior of RAND() in
> > batch and streaming queries?
> >
> > Provide the semantics that most users expect:
> > --
> > I don't think it is clear what most users expect, esp. if we also include
> > future users (which we certainly want to gain) into this assessment.
> > Our current users got used to the semantics that we introduced. So I
> > wouldn't be surprised if they would say stick with the current semantics.
> > However, we are also claiming standard SQL compliance and stress the goal
> > of batch-stream unification.
> > So I would assume that new SQL users expect standard compliant behavior
> for
> > batch and streaming queries.
> >
> >
> > IMO, we should try hard to stick to our goals of 1) uni

Re: Proposal to add Google Cloud Storage FileSystem with RecoverableWriter

2021-02-03 Thread Till Rohrmann
Hmm, maybe it was deleted again. I received an email notification yesterday.

On Wed, Feb 3, 2021 at 11:57 AM Xintong Song  wrote:

> @Till,
> Did I overlook anything? I don't find Galen's post on the old PR.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Feb 3, 2021 at 6:10 PM Till Rohrmann  wrote:
>
> > @Galen, I've just seen that you posted your ideas on the old Github PR. I
> > think it would be better to post it on the JIRA ticket [1].
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-11838
> >
> > Cheers,
> > Till
> >
> > On Tue, Feb 2, 2021 at 12:02 PM Xintong Song 
> > wrote:
> >
> > > Hi Galen,
> > >
> > > Thanks for offering the contribution.
> > >
> > > As Till has already suggested, please comment on FLINK-11838 your
> > solution
> > > proposal.
> > > Once we reach consensus on the proposal, I'll assign you to the ticket.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Tue, Feb 2, 2021 at 5:19 PM Till Rohrmann 
> > wrote:
> > >
> > > > Hi Galen,
> > > >
> > > > I think that adding support for GCS using the StreamingFileSink
> sounds
> > > like
> > > > a very good idea to me. Looking at FLINK-11838 I believe that this
> > effort
> > > > has been abandoned. I think that you could take this ticket over if
> you
> > > > want. Maybe you could update this ticket with your solution proposal.
> > > >
> > > > I will check whether I can find a committer who could help you with
> > this
> > > > effort.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Sat, Jan 30, 2021 at 7:43 PM Galen Warren <
> ga...@cvillewarrens.com>
> > > > wrote:
> > > >
> > > > > Hi -- I'm wondering if you would be interested in a contribution to
> > > add a
> > > > > HadoopFileSystem implementation, with associated RecoverableWriter,
> > for
> > > > > Google Cloud Storage. This would be similar to what's already in
> > place
> > > > for
> > > > > S3, and it would allow writing to GCS using a StreamingFileSink.
> The
> > > > > implementation would be similar to what's already in place for S3.
> > > > >
> > > > > I see there's been some work on this before (FLINK-11838 Add GCS
> > > > > RecoverableWriter by Fokko · Pull Request #7915 · apache/flink (
> > > > github.com
> > > > > )
> > > > > , but the original
> people
> > > > > working on it have put it on hold, and the last activity was over
> six
> > > > > months ago.
> > > > >
> > > > > I need this for my own purposes and I have an implementation that
> I'm
> > > > > working on locally. I'd be interested to contribute this if you'd
> be
> > > > > interested. Let me know if so and I'll create a Jira ticket.
> > > > >
> > > > > Thanks,
> > > > > Galen Warren
> > > > >
> > > >
> > >
> >
>


[jira] [Created] (FLINK-21261) Improve digest of physical Expand node

2021-02-03 Thread Jark Wu (Jira)
Jark Wu created FLINK-21261:
---

 Summary: Improve digest of physical Expand node
 Key: FLINK-21261
 URL: https://issues.apache.org/jira/browse/FLINK-21261
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Jark Wu
Assignee: Jark Wu
 Fix For: 1.13.0


Currently, the digest of {{StreamPhysicalExpand}} only geneartes field names, 
this loses many useful information, e.g. null fields, expand id, expand times. 

{code}
Expand(projects=[a, b, c, $f3, $f4, $e])
{code}

The digest of {{BatchPhysicalExpand}} generates additional projects list, but 
the first {{projects}} is reduandent information, we can remove it. 

{code}
Expand(projects=[a, c, $f2, d, $e, $f2_0], projects=[{a, c, $f2, d, 0 AS $e, 
$f2 AS $f2_0}, {a, c, null AS $f2, null AS d, 3 AS $e, $f2 AS $f2_0}])
{code}

The proposed digest of expand node would be:

{code}
Expand(projects=[{a, c, $f2, d, 0 AS $e, $f2 AS $f2_0}, {a, c, null AS $f2, 
null AS d, 3 AS $e, $f2 AS $f2_0}])
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLINK-21045: Support 'load module' and 'unload module' SQL syntax

2021-02-03 Thread Rui Li
Thanks Jane for the summary. Looks good to me.

On Wed, Feb 3, 2021 at 7:53 PM Jane Chan  wrote:

> Hi @Jark, @Timo, I've updated the comments, and please have a look when
> you're free.
>
> Best,
> Jane
>
> On Wed, Feb 3, 2021 at 7:14 PM Jane Chan  wrote:
>
> >
> > Reply @Timo
> >
> >> Remove the `used` column for SHOW MODULES. It will always show true.
> >>
> > Good catch. It's a copy-paste typo, and I forgot to remove that column.
> >
> > How about creating a POJO (static inner class of ModuleManager) called
> >> `ModuleEntry` or similar.
> >>
> > +1 for better encapsulation.
> >
> > Reply @Jark
> >
> >> A minor comment on `useModules(List names)`, would be better to
> >> use varargs here to a more fluent API: `useModules("a", "b", "c")`.
> >>
> >  +1, and that's better.
> >
> > Do we also need to add these new methods (useModules, listFullModules)
> >> to TableEnvironment?
> >>
> > Yes, indeed.
> >
> > Thank you all for polishing this proposal to make it more thorough.
> >
> > Best,
> > Jane
> >
> > On Wed, Feb 3, 2021 at 6:41 PM Jark Wu  wrote:
> >
> >> A minor comment on `useModules(List names)`,
> >> would be better to use varargs here to a more fluent API:
> `useModules("a",
> >> "b", "c")`.
> >>
> >> Besides, do we also need to add these new methods (useModules,
> >> listFullModules) to
> >> TableEnvironment?
> >>
> >> Best,
> >> Jark
> >>
> >> On Wed, 3 Feb 2021 at 18:36, Timo Walther  wrote:
> >>
> >> > Thanks for the nice summary Jane. The summary looks great. Some minor
> >> > feedback:
> >> >
> >> > - Remove the `used` column for SHOW MODULES. It will always show true.
> >> >
> >> > - `List> listFullModules()` is a very long
> >> > signature. And `Pair` should be avoided in code because it is not very
> >> > descriptive. How about creating a POJO (static inner class of
> >> > ModuleManager) called `ModuleEntry` or similar.
> >> >
> >> > Otherwise +1 for the proposal.
> >> >
> >> > Regards,
> >> > Timo
> >> >
> >> > On 03.02.21 11:24, Jane Chan wrote:
> >> > > Hi everyone,
> >> > >
> >> > > I did a summary on the Jira issue page [1] since the discussion has
> >> > > achieved a consensus. If there is anything missed or not corrected,
> >> > please
> >> > > let me know.
> >> > >
> >> > > [1] https://issues.apache.org/jira/browse/FLINK-21045#
> >> > >
> >> > > Best,
> >> > > Jane
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > On Wed, Feb 3, 2021 at 1:33 PM Jark Wu  wrote:
> >> > >
> >> > >> Hi Jane,
> >> > >>
> >> > >> Yes. I think we should fail fast.
> >> > >>
> >> > >> Best,
> >> > >> Jark
> >> > >>
> >> > >> On Wed, 3 Feb 2021 at 12:06, Jane Chan 
> >> wrote:
> >> > >>
> >> > >>> Hi everyone,
> >> > >>>
> >> > >>> Thanks for the discussion to make this improvement plan clearer.
> >> > >>>
> >> > >>> Hi, @Jark, @Rui, and @Timo, I'm collecting the final discussion
> >> > summaries
> >> > >>> now and want to confirm one thing that for the statement `USE
> >> MODULES x
> >> > >> [,
> >> > >>> y, z, ...]`, if the module name list contains an unexsited module,
> >> > shall
> >> > >> we
> >> > >>> #1 fail the execution for all of them or #2 enabled the rest
> modules
> >> > and
> >> > >>> return a warning to users? My personal preference goes to #1 for
> >> > >>> simplicity. What do you think?
> >> > >>>
> >> > >>> Best,
> >> > >>> Jane
> >> > >>>
> >> > >>> On Tue, Feb 2, 2021 at 3:53 PM Timo Walther 
> >> > wrote:
> >> > >>>
> >> >  +1
> >> > 
> >> >  @Jane Can you summarize our discussion in the JIRA issue?
> >> > 
> >> >  Thanks,
> >> >  Timo
> >> > 
> >> > 
> >> >  On 02.02.21 03:50, Jark Wu wrote:
> >> > > Hi Timo,
> >> > >
> >> > >> Another question is whether a LOAD operation also adds the
> >> module to
> >> > >>> the
> >> > > enabled list by default?
> >> > >
> >> > > I would like to add the module to the enabled list by default,
> the
> >> > >> main
> >> > > reasons are:
> >> > > 1) Reordering is an advanced requirement, adding modules needs
> >> > >>> additional
> >> > > USE statements with "core" module
> >> > >sounds too burdensome. Most users should be satisfied with
> only
> >> > >> LOAD
> >> > > statements.
> >> > > 2) We should keep compatible for TableEnvironment#loadModule().
> >> > > 3) We are using the LOAD statement instead of CREATE, so I think
> >> it's
> >> >  fine
> >> > > that it does some implicit things.
> >> > >
> >> > > Best,
> >> > > Jark
> >> > >
> >> > > On Tue, 2 Feb 2021 at 00:48, Timo Walther 
> >> > >> wrote:
> >> > >
> >> > >> Not the module itself but the ModuleManager should handle this
> >> case,
> >> >  yes.
> >> > >>
> >> > >> Regards,
> >> > >> Timo
> >> > >>
> >> > >>
> >> > >> On 01.02.21 17:35, Jane Chan wrote:
> >> > >>> +1 to Jark's proposal
> >> > >>>
> >> > >>> To make it clearer,  will `module#getFunctionDefinition()`
> >> > >> return
> >> >  empty
> >> > >>> sup

PyFlink DataStream API

2021-02-03 Thread Partha Mishra
Hi,

Can anyone suggest if PyFlink Data Stream APIs (available from Flink 1.12) are 
being properly tested and used in any production environment/use-case without 
any issue and gives expected performance?



Re: [DISCUSS] FLINK-21045: Support 'load module' and 'unload module' SQL syntax

2021-02-03 Thread Jane Chan
Hi @Jark, @Timo, I've updated the comments, and please have a look when
you're free.

Best,
Jane

On Wed, Feb 3, 2021 at 7:14 PM Jane Chan  wrote:

>
> Reply @Timo
>
>> Remove the `used` column for SHOW MODULES. It will always show true.
>>
> Good catch. It's a copy-paste typo, and I forgot to remove that column.
>
> How about creating a POJO (static inner class of ModuleManager) called
>> `ModuleEntry` or similar.
>>
> +1 for better encapsulation.
>
> Reply @Jark
>
>> A minor comment on `useModules(List names)`, would be better to
>> use varargs here to a more fluent API: `useModules("a", "b", "c")`.
>>
>  +1, and that's better.
>
> Do we also need to add these new methods (useModules, listFullModules)
>> to TableEnvironment?
>>
> Yes, indeed.
>
> Thank you all for polishing this proposal to make it more thorough.
>
> Best,
> Jane
>
> On Wed, Feb 3, 2021 at 6:41 PM Jark Wu  wrote:
>
>> A minor comment on `useModules(List names)`,
>> would be better to use varargs here to a more fluent API: `useModules("a",
>> "b", "c")`.
>>
>> Besides, do we also need to add these new methods (useModules,
>> listFullModules) to
>> TableEnvironment?
>>
>> Best,
>> Jark
>>
>> On Wed, 3 Feb 2021 at 18:36, Timo Walther  wrote:
>>
>> > Thanks for the nice summary Jane. The summary looks great. Some minor
>> > feedback:
>> >
>> > - Remove the `used` column for SHOW MODULES. It will always show true.
>> >
>> > - `List> listFullModules()` is a very long
>> > signature. And `Pair` should be avoided in code because it is not very
>> > descriptive. How about creating a POJO (static inner class of
>> > ModuleManager) called `ModuleEntry` or similar.
>> >
>> > Otherwise +1 for the proposal.
>> >
>> > Regards,
>> > Timo
>> >
>> > On 03.02.21 11:24, Jane Chan wrote:
>> > > Hi everyone,
>> > >
>> > > I did a summary on the Jira issue page [1] since the discussion has
>> > > achieved a consensus. If there is anything missed or not corrected,
>> > please
>> > > let me know.
>> > >
>> > > [1] https://issues.apache.org/jira/browse/FLINK-21045#
>> > >
>> > > Best,
>> > > Jane
>> > >
>> > >
>> > >
>> > >
>> > >
>> > > On Wed, Feb 3, 2021 at 1:33 PM Jark Wu  wrote:
>> > >
>> > >> Hi Jane,
>> > >>
>> > >> Yes. I think we should fail fast.
>> > >>
>> > >> Best,
>> > >> Jark
>> > >>
>> > >> On Wed, 3 Feb 2021 at 12:06, Jane Chan 
>> wrote:
>> > >>
>> > >>> Hi everyone,
>> > >>>
>> > >>> Thanks for the discussion to make this improvement plan clearer.
>> > >>>
>> > >>> Hi, @Jark, @Rui, and @Timo, I'm collecting the final discussion
>> > summaries
>> > >>> now and want to confirm one thing that for the statement `USE
>> MODULES x
>> > >> [,
>> > >>> y, z, ...]`, if the module name list contains an unexsited module,
>> > shall
>> > >> we
>> > >>> #1 fail the execution for all of them or #2 enabled the rest modules
>> > and
>> > >>> return a warning to users? My personal preference goes to #1 for
>> > >>> simplicity. What do you think?
>> > >>>
>> > >>> Best,
>> > >>> Jane
>> > >>>
>> > >>> On Tue, Feb 2, 2021 at 3:53 PM Timo Walther 
>> > wrote:
>> > >>>
>> >  +1
>> > 
>> >  @Jane Can you summarize our discussion in the JIRA issue?
>> > 
>> >  Thanks,
>> >  Timo
>> > 
>> > 
>> >  On 02.02.21 03:50, Jark Wu wrote:
>> > > Hi Timo,
>> > >
>> > >> Another question is whether a LOAD operation also adds the
>> module to
>> > >>> the
>> > > enabled list by default?
>> > >
>> > > I would like to add the module to the enabled list by default, the
>> > >> main
>> > > reasons are:
>> > > 1) Reordering is an advanced requirement, adding modules needs
>> > >>> additional
>> > > USE statements with "core" module
>> > >sounds too burdensome. Most users should be satisfied with only
>> > >> LOAD
>> > > statements.
>> > > 2) We should keep compatible for TableEnvironment#loadModule().
>> > > 3) We are using the LOAD statement instead of CREATE, so I think
>> it's
>> >  fine
>> > > that it does some implicit things.
>> > >
>> > > Best,
>> > > Jark
>> > >
>> > > On Tue, 2 Feb 2021 at 00:48, Timo Walther 
>> > >> wrote:
>> > >
>> > >> Not the module itself but the ModuleManager should handle this
>> case,
>> >  yes.
>> > >>
>> > >> Regards,
>> > >> Timo
>> > >>
>> > >>
>> > >> On 01.02.21 17:35, Jane Chan wrote:
>> > >>> +1 to Jark's proposal
>> > >>>
>> > >>> To make it clearer,  will `module#getFunctionDefinition()`
>> > >> return
>> >  empty
>> > >>> suppose the module is loaded but not enabled?
>> > >>>
>> > >>> Best,
>> > >>> Jane
>> > >>>
>> > >>> On Mon, Feb 1, 2021 at 10:02 PM Timo Walther <
>> twal...@apache.org>
>> >  wrote:
>> > >>>
>> >  +1 to Jark's proposal
>> > 
>> >  I like the difference between just loading and actually
>> enabling
>> > >>> these
>> >  modules.
>> > 
>> >  @Rui: I would use the same b

Re: [DISCUSS] FLIP-156: Runtime Interfaces for Fine-Grained Resource Requirements

2021-02-03 Thread Kezhu Wang
Hi Till,

Based on what I understood, if not wrong, the door is not closed after SSG
resource specifying. So, hope it could be useful in potential future
improvement.

Best,
Kezhu Wang


On February 3, 2021 at 18:07:21, Till Rohrmann (trohrm...@apache.org) wrote:

Thanks for sharing your thoughts Kezhu. I like your ideas of how
per-operator and SSG requirements can be combined. I've also thought about
defining a default resource profile for all tasks which have no resources
configured. That way all operators would have resources assigned if the
user chooses to use this feature.

As Yangze and Xintong have said, we have decided to first only support
specifying resources for SSGs as this seems more user friendly. Based on
the feedback for this feature one potential development direction might be
to allow the resource specification on per-operator basis. Here we could
pick up your ideas.

Cheers,
Till

On Wed, Feb 3, 2021 at 7:31 AM Xintong Song  wrote:

> Thanks for your feedback, Kezhu.
>
> I think Flink *runtime* already has an ideal granularity for resource
> > management 'task'. If there is
> > a slot shared by multiple tasks, that slot's resource requirement is
> simple
> > sum of all its logical
> > slots. So basically, this is no resource requirement for
SlotSharingGroup
> > in runtime until now,
> > right ?
>
> That is a halfly-cooked implementation, coming from the previous attempts
> (years ago) trying to deliver the fine-grained resource management
feature,
> and never really put into use.
>
> From the FLIP and dicusssion, I assume that SSG resource specifying will
> > override operator level
> > resource specifying if both are specified ?
> >
> Actually, I think we should use the finer-grained resources (i.e.
operator
> level) if both are specified. And more importantly, that is based on the
> assumption that we do need two different levels of interfaces.
>
> So, I wonder whether we could interpret SSG resource specifying as an
"add"
> > but not an "set" on
> > resource requirement ?
> >
> IIUC, this is the core idea behind your proposal. I think it provides an
> interesting idea of how we combine operator level and SSG level
resources,
> *if
> we allow configuring resources at both levels*. However, I'm not sure
> whether the configuring resources on the operator level is indeed needed.
> Therefore, as a first step, this FLIP proposes to only introduce the
> SSG-level interfaces. As listed in the future plan, we would consider
> allowing operator level resource configuration later if we do see a need
> for it. At that time, we definitely should discuss what to do if
resources
> are configured at both levels.
>
> * Could SSG express negative resource requirement ?
> >
> No.
>
> Is there concrete bar for partial resource configured not function ? I
> > saw it will fail job submission in Dispatcher.submitJob.
> >
> With the SSG-based approach, this should no longer be needed. The
> constraint was introduced because we can neither properly define what is
> the resource of a task chained from an operator with specified resource
and
> another with unspecified resource, nor for a slot shared by a task with
> specified resource and another with unspecified resource. With the
> SSG-based approach, we no longer have those problems.
>
> An option(cluster/job level) to force slot sharing in scheduler ? This
> > could be useful in case of migration from FLIP-156 to future approach.
> >
> I think this is exactly what we are trying to avoid, requiring the
> scheduler to enforce slot sharing.
>
> An option(cluster) to ignore resource specifying(allow resource specified
> > job to run on open box environment) for no production usage ?
> >
> That's possible. Actually, we are planning to introduce an option for
> activating the fine-grained resource management, for development
purposes.
> We might consider to keep that option after the feature is completed, to
> allow disable the feature without having to touch the job codes.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Feb 3, 2021 at 1:28 PM Kezhu Wang  wrote:
>
> > Hi all, sorry for join discussion even after voting started.
> >
> > I want to share my thoughts on this after reading above discussions.
> >
> > I think Flink *runtime* already has an ideal granularity for resource
> > management 'task'. If there is
> > a slot shared by multiple tasks, that slot's resource requirement is
> simple
> > sum of all its logical
> > slots. So basically, this is no resource requirement for
SlotSharingGroup
> > in runtime until now,
> > right ?
> >
> > As in discussion, we already agree upon that: "If all operators have
> their
> > resources properly
> > specified, then slot sharing is no longer needed. "
> >
> > So seems to me, naturally in mind path, what we would discuss is that:
> how
> > to bridge impractical
> > operator level resource specifying to runtime task level resource
> > requirement ? This is actually a
> > pure api thing as Chesnay has pointed out.
> >
> 

Re: [DISCUSS] FLIP-156: Runtime Interfaces for Fine-Grained Resource Requirements

2021-02-03 Thread Kezhu Wang
Hi, Yangze and Xintong, thank you for replies.

I indeed make assumptions, I list them here in order:
1. There is only task/LogicalSlot level resource specification in runtime.
And it comes from api side and is respected in runtime.
2. Current operator level resource specification in client side is
respected and used to aggregate
   task resource specification for runtime usage.
3. It is possible that other find-grained group level resource specfiying,
which could obey chaing, emerge in future.

My proposal is basing on first, and try to make room for the last two in
SSG resource specfiying.

@Xintong

> I think this is exactly what we are trying to avoid, requiring the
scheduler to enforce slot sharing.

I saw the dicussion to keep slot sharing as an hint, but in reality, will
SSG jobs expect to fail or
run slowly if scheduler does not respect it ? A slot with 20GB memory is
different from two 1GB
default sized slots. So, we actually depends on scheduler
version/implementation/de-fact if we
claim it is an hint.

@Xintong
> So, I wonder whether we could interpret SSG resource specifying as an
"add"
> but not an "set" on resource requirement ?

> IIUC, this is the core idea behind your proposal.

You are right, all other changes are serving for this. It is also the
semantics divergence between
the two: my suggestion treat SSG as an hint and extra resource specfiying
place while FLIP-156 tends
to treat SSG as restriction and authoritative resource specfiying. With
this change, I think FLIP-156
is just a special case by forcing only SSG and no other specifications.
That is, if there is no other
resource specifications, "set" equals to "add" to zero. So if this is the
case after FLIP-156, then
there is still room for this direction, if indeed required.

@Yangze, @Xintong
> never really used

Do you mean code-path or production environment ? If it is code-path, could
you please point out where
the story breaks ?

>From the dicussion and history, could I consider FLIP-156 is an redirection
more than inheritance/enhancement
of current halfly-cooked/ancient implmentation ?

Thank you, Yangze and Xintong.


On February 3, 2021 at 14:31:28, Xintong Song (tonysong...@gmail.com) wrote:

Thanks for your feedback, Kezhu.

I think Flink *runtime* already has an ideal granularity for resource
> management 'task'. If there is
> a slot shared by multiple tasks, that slot's resource requirement is
simple
> sum of all its logical
> slots. So basically, this is no resource requirement for SlotSharingGroup
> in runtime until now,
> right ?

That is a halfly-cooked implementation, coming from the previous attempts
(years ago) trying to deliver the fine-grained resource management feature,
and never really put into use.

>From the FLIP and dicusssion, I assume that SSG resource specifying will
> override operator level
> resource specifying if both are specified ?
>
Actually, I think we should use the finer-grained resources (i.e. operator
level) if both are specified. And more importantly, that is based on the
assumption that we do need two different levels of interfaces.

So, I wonder whether we could interpret SSG resource specifying as an "add"
> but not an "set" on
> resource requirement ?
>
IIUC, this is the core idea behind your proposal. I think it provides an
interesting idea of how we combine operator level and SSG level resources,
*if
we allow configuring resources at both levels*. However, I'm not sure
whether the configuring resources on the operator level is indeed needed.
Therefore, as a first step, this FLIP proposes to only introduce the
SSG-level interfaces. As listed in the future plan, we would consider
allowing operator level resource configuration later if we do see a need
for it. At that time, we definitely should discuss what to do if resources
are configured at both levels.

* Could SSG express negative resource requirement ?
>
No.

Is there concrete bar for partial resource configured not function ? I
> saw it will fail job submission in Dispatcher.submitJob.
>
With the SSG-based approach, this should no longer be needed. The
constraint was introduced because we can neither properly define what is
the resource of a task chained from an operator with specified resource and
another with unspecified resource, nor for a slot shared by a task with
specified resource and another with unspecified resource. With the
SSG-based approach, we no longer have those problems.

An option(cluster/job level) to force slot sharing in scheduler ? This
> could be useful in case of migration from FLIP-156 to future approach.
>
I think this is exactly what we are trying to avoid, requiring the
scheduler to enforce slot sharing.

An option(cluster) to ignore resource specifying(allow resource specified
> job to run on open box environment) for no production usage ?
>
That's possible. Actually, we are planning to introduce an option for
activating the fine-grained resource management, for development purposes.
We might consider 

Re: [DISCUSS] FLINK-21045: Support 'load module' and 'unload module' SQL syntax

2021-02-03 Thread Jane Chan
Reply @Timo

> Remove the `used` column for SHOW MODULES. It will always show true.
>
Good catch. It's a copy-paste typo, and I forgot to remove that column.

How about creating a POJO (static inner class of ModuleManager) called
> `ModuleEntry` or similar.
>
+1 for better encapsulation.

Reply @Jark

> A minor comment on `useModules(List names)`, would be better to
> use varargs here to a more fluent API: `useModules("a", "b", "c")`.
>
 +1, and that's better.

Do we also need to add these new methods (useModules, listFullModules)
> to TableEnvironment?
>
Yes, indeed.

Thank you all for polishing this proposal to make it more thorough.

Best,
Jane

On Wed, Feb 3, 2021 at 6:41 PM Jark Wu  wrote:

> A minor comment on `useModules(List names)`,
> would be better to use varargs here to a more fluent API: `useModules("a",
> "b", "c")`.
>
> Besides, do we also need to add these new methods (useModules,
> listFullModules) to
> TableEnvironment?
>
> Best,
> Jark
>
> On Wed, 3 Feb 2021 at 18:36, Timo Walther  wrote:
>
> > Thanks for the nice summary Jane. The summary looks great. Some minor
> > feedback:
> >
> > - Remove the `used` column for SHOW MODULES. It will always show true.
> >
> > - `List> listFullModules()` is a very long
> > signature. And `Pair` should be avoided in code because it is not very
> > descriptive. How about creating a POJO (static inner class of
> > ModuleManager) called `ModuleEntry` or similar.
> >
> > Otherwise +1 for the proposal.
> >
> > Regards,
> > Timo
> >
> > On 03.02.21 11:24, Jane Chan wrote:
> > > Hi everyone,
> > >
> > > I did a summary on the Jira issue page [1] since the discussion has
> > > achieved a consensus. If there is anything missed or not corrected,
> > please
> > > let me know.
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-21045#
> > >
> > > Best,
> > > Jane
> > >
> > >
> > >
> > >
> > >
> > > On Wed, Feb 3, 2021 at 1:33 PM Jark Wu  wrote:
> > >
> > >> Hi Jane,
> > >>
> > >> Yes. I think we should fail fast.
> > >>
> > >> Best,
> > >> Jark
> > >>
> > >> On Wed, 3 Feb 2021 at 12:06, Jane Chan  wrote:
> > >>
> > >>> Hi everyone,
> > >>>
> > >>> Thanks for the discussion to make this improvement plan clearer.
> > >>>
> > >>> Hi, @Jark, @Rui, and @Timo, I'm collecting the final discussion
> > summaries
> > >>> now and want to confirm one thing that for the statement `USE
> MODULES x
> > >> [,
> > >>> y, z, ...]`, if the module name list contains an unexsited module,
> > shall
> > >> we
> > >>> #1 fail the execution for all of them or #2 enabled the rest modules
> > and
> > >>> return a warning to users? My personal preference goes to #1 for
> > >>> simplicity. What do you think?
> > >>>
> > >>> Best,
> > >>> Jane
> > >>>
> > >>> On Tue, Feb 2, 2021 at 3:53 PM Timo Walther 
> > wrote:
> > >>>
> >  +1
> > 
> >  @Jane Can you summarize our discussion in the JIRA issue?
> > 
> >  Thanks,
> >  Timo
> > 
> > 
> >  On 02.02.21 03:50, Jark Wu wrote:
> > > Hi Timo,
> > >
> > >> Another question is whether a LOAD operation also adds the module
> to
> > >>> the
> > > enabled list by default?
> > >
> > > I would like to add the module to the enabled list by default, the
> > >> main
> > > reasons are:
> > > 1) Reordering is an advanced requirement, adding modules needs
> > >>> additional
> > > USE statements with "core" module
> > >sounds too burdensome. Most users should be satisfied with only
> > >> LOAD
> > > statements.
> > > 2) We should keep compatible for TableEnvironment#loadModule().
> > > 3) We are using the LOAD statement instead of CREATE, so I think
> it's
> >  fine
> > > that it does some implicit things.
> > >
> > > Best,
> > > Jark
> > >
> > > On Tue, 2 Feb 2021 at 00:48, Timo Walther 
> > >> wrote:
> > >
> > >> Not the module itself but the ModuleManager should handle this
> case,
> >  yes.
> > >>
> > >> Regards,
> > >> Timo
> > >>
> > >>
> > >> On 01.02.21 17:35, Jane Chan wrote:
> > >>> +1 to Jark's proposal
> > >>>
> > >>> To make it clearer,  will `module#getFunctionDefinition()`
> > >> return
> >  empty
> > >>> suppose the module is loaded but not enabled?
> > >>>
> > >>> Best,
> > >>> Jane
> > >>>
> > >>> On Mon, Feb 1, 2021 at 10:02 PM Timo Walther  >
> >  wrote:
> > >>>
> >  +1 to Jark's proposal
> > 
> >  I like the difference between just loading and actually enabling
> > >>> these
> >  modules.
> > 
> >  @Rui: I would use the same behavior as catalogs here. You cannot
> >  `USE` a
> >  catalog without creating it before.
> > 
> >  Another question is whether a LOAD operation also adds the
> module
> > >> to
> >  the
> >  enabled list by default?
> > 
> >  Regards,
> >  Timo
> > 
> >  On 01.02.21 13:52, Rui Li wr

[jira] [Created] (FLINK-21260) Add DeclarativeScheduler / Finished state

2021-02-03 Thread Robert Metzger (Jira)
Robert Metzger created FLINK-21260:
--

 Summary: Add DeclarativeScheduler / Finished state
 Key: FLINK-21260
 URL: https://issues.apache.org/jira/browse/FLINK-21260
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Robert Metzger
 Fix For: 1.13.0


This subtask of adding the declarative scheduler is about adding the Finished 
state to Flink, including tests.

Finished: The job execution has been completed.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21259) Add DeclarativeScheduler / Failing state

2021-02-03 Thread Robert Metzger (Jira)
Robert Metzger created FLINK-21259:
--

 Summary: Add DeclarativeScheduler / Failing state
 Key: FLINK-21259
 URL: https://issues.apache.org/jira/browse/FLINK-21259
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Robert Metzger
 Fix For: 1.13.0


This subtask of adding the declarative scheduler is about adding the Failing 
state to Flink, including tests.

Failing: An unrecoverable fault has occurred. The scheduler stops the 
ExecutionGraph by canceling it.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21258) Add DeclarativeScheduler / Canceling state

2021-02-03 Thread Robert Metzger (Jira)
Robert Metzger created FLINK-21258:
--

 Summary: Add DeclarativeScheduler / Canceling state
 Key: FLINK-21258
 URL: https://issues.apache.org/jira/browse/FLINK-21258
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Robert Metzger
 Fix For: 1.13.0


This subtask of adding the declarative scheduler is about adding the Canceling 
state to Flink, including tests.

Canceling: The job has been canceled by the user. The scheduler stops the 
ExecutionGraph by canceling it.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21257) Add DeclarativeScheduler / Restarting state

2021-02-03 Thread Robert Metzger (Jira)
Robert Metzger created FLINK-21257:
--

 Summary: Add DeclarativeScheduler / Restarting state
 Key: FLINK-21257
 URL: https://issues.apache.org/jira/browse/FLINK-21257
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Robert Metzger
 Fix For: 1.13.0


This subtask of adding the declarative scheduler is about adding the Created 
state to Flink, including tests.

Restarting: A recoverable fault has occurred. The scheduler stops the 
ExecutionGraph by canceling it.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21256) Add DeclarativeScheduler / Executing state

2021-02-03 Thread Robert Metzger (Jira)
Robert Metzger created FLINK-21256:
--

 Summary: Add DeclarativeScheduler / Executing state
 Key: FLINK-21256
 URL: https://issues.apache.org/jira/browse/FLINK-21256
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Robert Metzger
Assignee: Robert Metzger
 Fix For: 1.13.0


This subtask of adding the declarative scheduler is about adding the Created 
state to Flink, including tests.

Executing: The set of resources is stable and the scheduler could decide on the 
parallelism with which to execute the job. The ExecutionGraph is created and 
the execution of the job has started.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21255) Add DeclarativeScheduler / WaitingForResources state

2021-02-03 Thread Robert Metzger (Jira)
Robert Metzger created FLINK-21255:
--

 Summary: Add DeclarativeScheduler / WaitingForResources state
 Key: FLINK-21255
 URL: https://issues.apache.org/jira/browse/FLINK-21255
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Robert Metzger
Assignee: Robert Metzger
 Fix For: 1.13.0


This subtask of adding the declarative scheduler is about adding the Created 
state to Flink, including tests.

Waiting for resources: The required resources are declared. The scheduler waits 
until either the requirements are fulfilled or the set of resources has 
stabilised.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21254) Add DeclarativeScheduler / Created state

2021-02-03 Thread Robert Metzger (Jira)
Robert Metzger created FLINK-21254:
--

 Summary: Add DeclarativeScheduler / Created state 
 Key: FLINK-21254
 URL: https://issues.apache.org/jira/browse/FLINK-21254
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Robert Metzger
Assignee: Robert Metzger
 Fix For: 1.13.0


This subtask of adding the declarative scheduler is about adding the Created 
state to Flink, including tests.
Created: Initial state of the scheduler




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Proposal to add Google Cloud Storage FileSystem with RecoverableWriter

2021-02-03 Thread Xintong Song
@Till,
Did I overlook anything? I don't find Galen's post on the old PR.

Thank you~

Xintong Song



On Wed, Feb 3, 2021 at 6:10 PM Till Rohrmann  wrote:

> @Galen, I've just seen that you posted your ideas on the old Github PR. I
> think it would be better to post it on the JIRA ticket [1].
>
> [1] https://issues.apache.org/jira/browse/FLINK-11838
>
> Cheers,
> Till
>
> On Tue, Feb 2, 2021 at 12:02 PM Xintong Song 
> wrote:
>
> > Hi Galen,
> >
> > Thanks for offering the contribution.
> >
> > As Till has already suggested, please comment on FLINK-11838 your
> solution
> > proposal.
> > Once we reach consensus on the proposal, I'll assign you to the ticket.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Tue, Feb 2, 2021 at 5:19 PM Till Rohrmann 
> wrote:
> >
> > > Hi Galen,
> > >
> > > I think that adding support for GCS using the StreamingFileSink sounds
> > like
> > > a very good idea to me. Looking at FLINK-11838 I believe that this
> effort
> > > has been abandoned. I think that you could take this ticket over if you
> > > want. Maybe you could update this ticket with your solution proposal.
> > >
> > > I will check whether I can find a committer who could help you with
> this
> > > effort.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Sat, Jan 30, 2021 at 7:43 PM Galen Warren 
> > > wrote:
> > >
> > > > Hi -- I'm wondering if you would be interested in a contribution to
> > add a
> > > > HadoopFileSystem implementation, with associated RecoverableWriter,
> for
> > > > Google Cloud Storage. This would be similar to what's already in
> place
> > > for
> > > > S3, and it would allow writing to GCS using a StreamingFileSink. The
> > > > implementation would be similar to what's already in place for S3.
> > > >
> > > > I see there's been some work on this before (FLINK-11838 Add GCS
> > > > RecoverableWriter by Fokko · Pull Request #7915 · apache/flink (
> > > github.com
> > > > )
> > > > , but the original people
> > > > working on it have put it on hold, and the last activity was over six
> > > > months ago.
> > > >
> > > > I need this for my own purposes and I have an implementation that I'm
> > > > working on locally. I'd be interested to contribute this if you'd be
> > > > interested. Let me know if so and I'll create a Jira ticket.
> > > >
> > > > Thanks,
> > > > Galen Warren
> > > >
> > >
> >
>


Re: [DISCUSS] FLINK-21045: Support 'load module' and 'unload module' SQL syntax

2021-02-03 Thread Jark Wu
A minor comment on `useModules(List names)`,
would be better to use varargs here to a more fluent API: `useModules("a",
"b", "c")`.

Besides, do we also need to add these new methods (useModules,
listFullModules) to
TableEnvironment?

Best,
Jark

On Wed, 3 Feb 2021 at 18:36, Timo Walther  wrote:

> Thanks for the nice summary Jane. The summary looks great. Some minor
> feedback:
>
> - Remove the `used` column for SHOW MODULES. It will always show true.
>
> - `List> listFullModules()` is a very long
> signature. And `Pair` should be avoided in code because it is not very
> descriptive. How about creating a POJO (static inner class of
> ModuleManager) called `ModuleEntry` or similar.
>
> Otherwise +1 for the proposal.
>
> Regards,
> Timo
>
> On 03.02.21 11:24, Jane Chan wrote:
> > Hi everyone,
> >
> > I did a summary on the Jira issue page [1] since the discussion has
> > achieved a consensus. If there is anything missed or not corrected,
> please
> > let me know.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-21045#
> >
> > Best,
> > Jane
> >
> >
> >
> >
> >
> > On Wed, Feb 3, 2021 at 1:33 PM Jark Wu  wrote:
> >
> >> Hi Jane,
> >>
> >> Yes. I think we should fail fast.
> >>
> >> Best,
> >> Jark
> >>
> >> On Wed, 3 Feb 2021 at 12:06, Jane Chan  wrote:
> >>
> >>> Hi everyone,
> >>>
> >>> Thanks for the discussion to make this improvement plan clearer.
> >>>
> >>> Hi, @Jark, @Rui, and @Timo, I'm collecting the final discussion
> summaries
> >>> now and want to confirm one thing that for the statement `USE MODULES x
> >> [,
> >>> y, z, ...]`, if the module name list contains an unexsited module,
> shall
> >> we
> >>> #1 fail the execution for all of them or #2 enabled the rest modules
> and
> >>> return a warning to users? My personal preference goes to #1 for
> >>> simplicity. What do you think?
> >>>
> >>> Best,
> >>> Jane
> >>>
> >>> On Tue, Feb 2, 2021 at 3:53 PM Timo Walther 
> wrote:
> >>>
>  +1
> 
>  @Jane Can you summarize our discussion in the JIRA issue?
> 
>  Thanks,
>  Timo
> 
> 
>  On 02.02.21 03:50, Jark Wu wrote:
> > Hi Timo,
> >
> >> Another question is whether a LOAD operation also adds the module to
> >>> the
> > enabled list by default?
> >
> > I would like to add the module to the enabled list by default, the
> >> main
> > reasons are:
> > 1) Reordering is an advanced requirement, adding modules needs
> >>> additional
> > USE statements with "core" module
> >sounds too burdensome. Most users should be satisfied with only
> >> LOAD
> > statements.
> > 2) We should keep compatible for TableEnvironment#loadModule().
> > 3) We are using the LOAD statement instead of CREATE, so I think it's
>  fine
> > that it does some implicit things.
> >
> > Best,
> > Jark
> >
> > On Tue, 2 Feb 2021 at 00:48, Timo Walther 
> >> wrote:
> >
> >> Not the module itself but the ModuleManager should handle this case,
>  yes.
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 01.02.21 17:35, Jane Chan wrote:
> >>> +1 to Jark's proposal
> >>>
> >>> To make it clearer,  will `module#getFunctionDefinition()`
> >> return
>  empty
> >>> suppose the module is loaded but not enabled?
> >>>
> >>> Best,
> >>> Jane
> >>>
> >>> On Mon, Feb 1, 2021 at 10:02 PM Timo Walther 
>  wrote:
> >>>
>  +1 to Jark's proposal
> 
>  I like the difference between just loading and actually enabling
> >>> these
>  modules.
> 
>  @Rui: I would use the same behavior as catalogs here. You cannot
>  `USE` a
>  catalog without creating it before.
> 
>  Another question is whether a LOAD operation also adds the module
> >> to
>  the
>  enabled list by default?
> 
>  Regards,
>  Timo
> 
>  On 01.02.21 13:52, Rui Li wrote:
> > If `USE MODULES` implies unloading modules that are not listed,
> >>> does
>  it
> > also imply loading modules that are not previously loaded,
> >>> especially
>  since
> > we're mapping modules by name now?
> >
> > On Mon, Feb 1, 2021 at 8:20 PM Jark Wu  wrote:
> >
> >> I agree with Timo that the USE implies the specified modules are
> >>> in
> >> use
>  in
> >> the specified order and others are not used.
> >> This would be easier to know what's the result list and order
> >>> after
> >> the
>  USE
> >> statement.
> >> That means: if current modules in order are x, y, z. And `USE
>  MODULES
>  z, y`
> >> means current modules in order are z, y.
> >>
> >> But I would like to not unload the unmentioned modules in the
> >> USE
> >> statement. Because it seems strange that USE
> >> will implicitly remove modules. In the abo

Re: [DISCUSS] FLINK-21045: Support 'load module' and 'unload module' SQL syntax

2021-02-03 Thread Timo Walther
Thanks for the nice summary Jane. The summary looks great. Some minor 
feedback:


- Remove the `used` column for SHOW MODULES. It will always show true.

- `List> listFullModules()` is a very long 
signature. And `Pair` should be avoided in code because it is not very 
descriptive. How about creating a POJO (static inner class of 
ModuleManager) called `ModuleEntry` or similar.


Otherwise +1 for the proposal.

Regards,
Timo

On 03.02.21 11:24, Jane Chan wrote:

Hi everyone,

I did a summary on the Jira issue page [1] since the discussion has
achieved a consensus. If there is anything missed or not corrected, please
let me know.

[1] https://issues.apache.org/jira/browse/FLINK-21045#

Best,
Jane





On Wed, Feb 3, 2021 at 1:33 PM Jark Wu  wrote:


Hi Jane,

Yes. I think we should fail fast.

Best,
Jark

On Wed, 3 Feb 2021 at 12:06, Jane Chan  wrote:


Hi everyone,

Thanks for the discussion to make this improvement plan clearer.

Hi, @Jark, @Rui, and @Timo, I'm collecting the final discussion summaries
now and want to confirm one thing that for the statement `USE MODULES x

[,

y, z, ...]`, if the module name list contains an unexsited module, shall

we

#1 fail the execution for all of them or #2 enabled the rest modules and
return a warning to users? My personal preference goes to #1 for
simplicity. What do you think?

Best,
Jane

On Tue, Feb 2, 2021 at 3:53 PM Timo Walther  wrote:


+1

@Jane Can you summarize our discussion in the JIRA issue?

Thanks,
Timo


On 02.02.21 03:50, Jark Wu wrote:

Hi Timo,


Another question is whether a LOAD operation also adds the module to

the

enabled list by default?

I would like to add the module to the enabled list by default, the

main

reasons are:
1) Reordering is an advanced requirement, adding modules needs

additional

USE statements with "core" module
   sounds too burdensome. Most users should be satisfied with only

LOAD

statements.
2) We should keep compatible for TableEnvironment#loadModule().
3) We are using the LOAD statement instead of CREATE, so I think it's

fine

that it does some implicit things.

Best,
Jark

On Tue, 2 Feb 2021 at 00:48, Timo Walther 

wrote:



Not the module itself but the ModuleManager should handle this case,

yes.


Regards,
Timo


On 01.02.21 17:35, Jane Chan wrote:

+1 to Jark's proposal

To make it clearer,  will `module#getFunctionDefinition()`

return

empty

suppose the module is loaded but not enabled?

Best,
Jane

On Mon, Feb 1, 2021 at 10:02 PM Timo Walther 

wrote:



+1 to Jark's proposal

I like the difference between just loading and actually enabling

these

modules.

@Rui: I would use the same behavior as catalogs here. You cannot

`USE` a

catalog without creating it before.

Another question is whether a LOAD operation also adds the module

to

the

enabled list by default?

Regards,
Timo

On 01.02.21 13:52, Rui Li wrote:

If `USE MODULES` implies unloading modules that are not listed,

does

it

also imply loading modules that are not previously loaded,

especially

since

we're mapping modules by name now?

On Mon, Feb 1, 2021 at 8:20 PM Jark Wu  wrote:


I agree with Timo that the USE implies the specified modules are

in

use

in

the specified order and others are not used.
This would be easier to know what's the result list and order

after

the

USE

statement.
That means: if current modules in order are x, y, z. And `USE

MODULES

z, y`

means current modules in order are z, y.

But I would like to not unload the unmentioned modules in the

USE

statement. Because it seems strange that USE
will implicitly remove modules. In the above example, the user

may

type

the

wrong modules list using USE by mistake
 and would like to declare the list again, the user has to

create

the

module again with some properties he may don't know. Therefore,

I

propose

the USE statement just specifies the current module lists and

doesn't

unload modules.
Besides that, we may need a new syntax to list all the modules

including

not used but loaded.
We can introduce SHOW FULL MODULES for this purpose with an

additional

`used` column.

For example:

Flink SQL> list modules:
---
| modules |
---
| x   |
| y   |
| z   |
---
Flink SQL> USE MODULES z, y;
Flink SQL> show modules:
---
| modules |
---
| z   |
| y   |
---
Flink SQL> show FULL modules;
---
| modules |  used |
---
| z   | true  |
| y   | true  |
| x   | false |
---
Flink SQL> USE MODULES z, y, x;
Flink SQL> show modules;
---
| modules |
---
| z   |
| y   |
| x   |
---

What do you think?

Best,
Jark

On Mon, 1 Feb 2021 at 19:02, Jane Chan 

wrote:



Hi Timo, thanks for the discussion.

It seems to reach an agreement regarding #3 that <1> Module

name

should

better be a simple identifier rather than a string literal. <2>

Property

`type` is redundant and should be removed, and mapping

Re: [DISCUSS] FLINK-21045: Support 'load module' and 'unload module' SQL syntax

2021-02-03 Thread Jane Chan
Hi everyone,

I did a summary on the Jira issue page [1] since the discussion has
achieved a consensus. If there is anything missed or not corrected, please
let me know.

[1] https://issues.apache.org/jira/browse/FLINK-21045#

Best,
Jane





On Wed, Feb 3, 2021 at 1:33 PM Jark Wu  wrote:

> Hi Jane,
>
> Yes. I think we should fail fast.
>
> Best,
> Jark
>
> On Wed, 3 Feb 2021 at 12:06, Jane Chan  wrote:
>
> > Hi everyone,
> >
> > Thanks for the discussion to make this improvement plan clearer.
> >
> > Hi, @Jark, @Rui, and @Timo, I'm collecting the final discussion summaries
> > now and want to confirm one thing that for the statement `USE MODULES x
> [,
> > y, z, ...]`, if the module name list contains an unexsited module, shall
> we
> > #1 fail the execution for all of them or #2 enabled the rest modules and
> > return a warning to users? My personal preference goes to #1 for
> > simplicity. What do you think?
> >
> > Best,
> > Jane
> >
> > On Tue, Feb 2, 2021 at 3:53 PM Timo Walther  wrote:
> >
> > > +1
> > >
> > > @Jane Can you summarize our discussion in the JIRA issue?
> > >
> > > Thanks,
> > > Timo
> > >
> > >
> > > On 02.02.21 03:50, Jark Wu wrote:
> > > > Hi Timo,
> > > >
> > > >> Another question is whether a LOAD operation also adds the module to
> > the
> > > > enabled list by default?
> > > >
> > > > I would like to add the module to the enabled list by default, the
> main
> > > > reasons are:
> > > > 1) Reordering is an advanced requirement, adding modules needs
> > additional
> > > > USE statements with "core" module
> > > >   sounds too burdensome. Most users should be satisfied with only
> LOAD
> > > > statements.
> > > > 2) We should keep compatible for TableEnvironment#loadModule().
> > > > 3) We are using the LOAD statement instead of CREATE, so I think it's
> > > fine
> > > > that it does some implicit things.
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > > On Tue, 2 Feb 2021 at 00:48, Timo Walther 
> wrote:
> > > >
> > > >> Not the module itself but the ModuleManager should handle this case,
> > > yes.
> > > >>
> > > >> Regards,
> > > >> Timo
> > > >>
> > > >>
> > > >> On 01.02.21 17:35, Jane Chan wrote:
> > > >>> +1 to Jark's proposal
> > > >>>
> > > >>>To make it clearer,  will `module#getFunctionDefinition()`
> return
> > > empty
> > > >>> suppose the module is loaded but not enabled?
> > > >>>
> > > >>> Best,
> > > >>> Jane
> > > >>>
> > > >>> On Mon, Feb 1, 2021 at 10:02 PM Timo Walther 
> > > wrote:
> > > >>>
> > >  +1 to Jark's proposal
> > > 
> > >  I like the difference between just loading and actually enabling
> > these
> > >  modules.
> > > 
> > >  @Rui: I would use the same behavior as catalogs here. You cannot
> > > `USE` a
> > >  catalog without creating it before.
> > > 
> > >  Another question is whether a LOAD operation also adds the module
> to
> > > the
> > >  enabled list by default?
> > > 
> > >  Regards,
> > >  Timo
> > > 
> > >  On 01.02.21 13:52, Rui Li wrote:
> > > > If `USE MODULES` implies unloading modules that are not listed,
> > does
> > > it
> > > > also imply loading modules that are not previously loaded,
> > especially
> > >  since
> > > > we're mapping modules by name now?
> > > >
> > > > On Mon, Feb 1, 2021 at 8:20 PM Jark Wu  wrote:
> > > >
> > > >> I agree with Timo that the USE implies the specified modules are
> > in
> > > >> use
> > >  in
> > > >> the specified order and others are not used.
> > > >> This would be easier to know what's the result list and order
> > after
> > > >> the
> > >  USE
> > > >> statement.
> > > >> That means: if current modules in order are x, y, z. And `USE
> > > MODULES
> > >  z, y`
> > > >> means current modules in order are z, y.
> > > >>
> > > >> But I would like to not unload the unmentioned modules in the
> USE
> > > >> statement. Because it seems strange that USE
> > > >> will implicitly remove modules. In the above example, the user
> may
> > > >> type
> > >  the
> > > >> wrong modules list using USE by mistake
> > > >> and would like to declare the list again, the user has to
> > create
> > > >> the
> > > >> module again with some properties he may don't know. Therefore,
> I
> > >  propose
> > > >> the USE statement just specifies the current module lists and
> > > doesn't
> > > >> unload modules.
> > > >> Besides that, we may need a new syntax to list all the modules
> > > >> including
> > > >> not used but loaded.
> > > >> We can introduce SHOW FULL MODULES for this purpose with an
> > > additional
> > > >> `used` column.
> > > >>
> > > >> For example:
> > > >>
> > > >> Flink SQL> list modules:
> > > >> ---
> > > >> | modules |
> > > >> ---
> > > >> | x   |
> > > >> | y   |
> > > >> | z   |
> > > >> ---
> > > >> Flink SQL> USE MODUL

Re: Proposal to add Google Cloud Storage FileSystem with RecoverableWriter

2021-02-03 Thread Till Rohrmann
@Galen, I've just seen that you posted your ideas on the old Github PR. I
think it would be better to post it on the JIRA ticket [1].

[1] https://issues.apache.org/jira/browse/FLINK-11838

Cheers,
Till

On Tue, Feb 2, 2021 at 12:02 PM Xintong Song  wrote:

> Hi Galen,
>
> Thanks for offering the contribution.
>
> As Till has already suggested, please comment on FLINK-11838 your solution
> proposal.
> Once we reach consensus on the proposal, I'll assign you to the ticket.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Feb 2, 2021 at 5:19 PM Till Rohrmann  wrote:
>
> > Hi Galen,
> >
> > I think that adding support for GCS using the StreamingFileSink sounds
> like
> > a very good idea to me. Looking at FLINK-11838 I believe that this effort
> > has been abandoned. I think that you could take this ticket over if you
> > want. Maybe you could update this ticket with your solution proposal.
> >
> > I will check whether I can find a committer who could help you with this
> > effort.
> >
> > Cheers,
> > Till
> >
> > On Sat, Jan 30, 2021 at 7:43 PM Galen Warren 
> > wrote:
> >
> > > Hi -- I'm wondering if you would be interested in a contribution to
> add a
> > > HadoopFileSystem implementation, with associated RecoverableWriter, for
> > > Google Cloud Storage. This would be similar to what's already in place
> > for
> > > S3, and it would allow writing to GCS using a StreamingFileSink. The
> > > implementation would be similar to what's already in place for S3.
> > >
> > > I see there's been some work on this before (FLINK-11838 Add GCS
> > > RecoverableWriter by Fokko · Pull Request #7915 · apache/flink (
> > github.com
> > > )
> > > , but the original people
> > > working on it have put it on hold, and the last activity was over six
> > > months ago.
> > >
> > > I need this for my own purposes and I have an implementation that I'm
> > > working on locally. I'd be interested to contribute this if you'd be
> > > interested. Let me know if so and I'll create a Jira ticket.
> > >
> > > Thanks,
> > > Galen Warren
> > >
> >
>


Re: [DISCUSS] FLIP-156: Runtime Interfaces for Fine-Grained Resource Requirements

2021-02-03 Thread Till Rohrmann
Thanks for sharing your thoughts Kezhu. I like your ideas of how
per-operator and SSG requirements can be combined. I've also thought about
defining a default resource profile for all tasks which have no resources
configured. That way all operators would have resources assigned if the
user chooses to use this feature.

As Yangze and Xintong have said, we have decided to first only support
specifying resources for SSGs as this seems more user friendly. Based on
the feedback for this feature one potential development direction might be
to allow the resource specification on per-operator basis. Here we could
pick up your ideas.

Cheers,
Till

On Wed, Feb 3, 2021 at 7:31 AM Xintong Song  wrote:

> Thanks for your feedback, Kezhu.
>
> I think Flink *runtime* already has an ideal granularity for resource
> > management 'task'. If there is
> > a slot shared by multiple tasks, that slot's resource requirement is
> simple
> > sum of all its logical
> > slots. So basically, this is no resource requirement for SlotSharingGroup
> > in runtime until now,
> > right ?
>
> That is a halfly-cooked implementation, coming from the previous attempts
> (years ago) trying to deliver the fine-grained resource management feature,
> and never really put into use.
>
> From the FLIP and dicusssion, I assume that SSG resource specifying will
> > override operator level
> > resource specifying if both are specified ?
> >
> Actually, I think we should use the finer-grained resources (i.e. operator
> level) if both are specified. And more importantly, that is based on the
> assumption that we do need two different levels of interfaces.
>
> So, I wonder whether we could interpret SSG resource specifying as an "add"
> > but not an "set" on
> > resource requirement ?
> >
> IIUC, this is the core idea behind your proposal. I think it provides an
> interesting idea of how we combine operator level and SSG level resources,
> *if
> we allow configuring resources at both levels*. However, I'm not sure
> whether the configuring resources on the operator level is indeed needed.
> Therefore, as a first step, this FLIP proposes to only introduce the
> SSG-level interfaces. As listed in the future plan, we would consider
> allowing operator level resource configuration later if we do see a need
> for it. At that time, we definitely should discuss what to do if resources
> are configured at both levels.
>
> * Could SSG express negative resource requirement ?
> >
> No.
>
> Is there concrete bar for partial resource configured not function ? I
> > saw it will fail job submission in Dispatcher.submitJob.
> >
> With the SSG-based approach, this should no longer be needed. The
> constraint was introduced because we can neither properly define what is
> the resource of a task chained from an operator with specified resource and
> another with unspecified resource, nor for a slot shared by a task with
> specified resource and another with unspecified resource. With the
> SSG-based approach, we no longer have those problems.
>
> An option(cluster/job level) to force slot sharing in scheduler ? This
> > could be useful in case of migration from FLIP-156 to future approach.
> >
> I think this is exactly what we are trying to avoid, requiring the
> scheduler to enforce slot sharing.
>
> An option(cluster) to ignore resource specifying(allow resource specified
> > job to run on open box environment) for no production usage ?
> >
> That's possible. Actually, we are planning to introduce an option for
> activating the fine-grained resource management, for development purposes.
> We might consider to keep that option after the feature is completed, to
> allow disable the feature without having to touch the job codes.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Feb 3, 2021 at 1:28 PM Kezhu Wang  wrote:
>
> > Hi all, sorry for join discussion even after voting started.
> >
> > I want to share my thoughts on this after reading above discussions.
> >
> > I think Flink *runtime* already has an ideal granularity for resource
> > management 'task'. If there is
> > a slot shared by multiple tasks, that slot's resource requirement is
> simple
> > sum of all its logical
> > slots. So basically, this is no resource requirement for SlotSharingGroup
> > in runtime until now,
> > right ?
> >
> > As in discussion, we already agree upon that: "If all operators have
> their
> > resources properly
> > specified, then slot sharing is no longer needed. "
> >
> > So seems to me, naturally in mind path, what we would discuss is that:
> how
> > to bridge impractical
> > operator level resource specifying to runtime task level resource
> > requirement ? This is actually a
> > pure api thing as Chesnay has pointed out.
> >
> > But FLIP-156 brings another direction on table: how about using SSG for
> > both api and runtime
> > resource specifying ?
> >
> > From the FLIP and dicusssion, I assume that SSG resource specifying will
> > override operator level
> > resource specifying if both

[jira] [Created] (FLINK-21253) Support grouping set/cube/rollup syntax in WindowAggregate

2021-02-03 Thread Andy (Jira)
Andy created FLINK-21253:


 Summary: Support grouping set/cube/rollup syntax in WindowAggregate
 Key: FLINK-21253
 URL: https://issues.apache.org/jira/browse/FLINK-21253
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Planner
Reporter: Andy


Support grouping set/cube/rollup syntax in WindowAggregate, like the following 
example.

 
{code:java}
//代码占位符
SELECT
a,
TUMBLE_START(rowtime, INTERVAL '15' MINUTE),
TUMBLE_END(rowtime, INTERVAL '15' MINUTE),
COUNT(1)
FROM MyTable
GROUP BY GROUPING SETS (`a`, ()), TUMBLE(rowtime, INTERVAL '15' MINUTE)
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21252) Scala quickstarts should specify -target jvm version

2021-02-03 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-21252:


 Summary: Scala quickstarts should specify -target jvm version
 Key: FLINK-21252
 URL: https://issues.apache.org/jira/browse/FLINK-21252
 Project: Flink
  Issue Type: Improvement
  Components: Build System, Quickstarts
Reporter: Chesnay Schepler
 Fix For: 1.13.0


The scala-maven-plugin configuration within Flink itself sets the {{-target}} 
parameter to {{jvm-1.8}}, however the quickstarts do not and IIRC default to 
java 6.

As a result it can happen that we add new APIs that are no readily usable by 
Scala users.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21251) Last valid checkpoint metadata lost after job exits restart loop

2021-02-03 Thread Paul Lin (Jira)
Paul Lin created FLINK-21251:


 Summary: Last valid checkpoint metadata lost after job exits 
restart loop
 Key: FLINK-21251
 URL: https://issues.apache.org/jira/browse/FLINK-21251
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.7.2
Reporter: Paul Lin
 Attachments: jm_logs

We have a Flink job of a relatively old version, 1.7.1, that failed with no 
valid checkpoint to restore. The job was first affected by a Kafka network 
instability and fell into the restart loop with the policy of 3 restarts in 5 
minutes. After the restarts exhausted, the job turned into the final state 
FAILED and exits. But the problem is that the last valid checkpoint 4585 that 
was restored multiple times during the restarts, was corrupted (no _metadata) 
after the job exited. 

 

I've checked the checkpoint dir on HDFS and found that chk-4585 which was 
finished at 12:16 was modified at 12:23 when jobmanager was shutting down with 
lots of error logs saying the deletes of pending checkpoints somehow failed. So 
I'm suspecting that the checkpoint metadata was unexpectedly deleted by 
jobmanager.

 

The jobmanager logs are attached below.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21250) Failure to finalize checkpoint due to org.apache.hadoop.fs.FileAlreadyExistsException: /user/flink/checkpoints/9e36ed4ec2f6685f836e5ee5395f5f2e/chk-11096/_metadata for

2021-02-03 Thread Fangliang Liu (Jira)
Fangliang Liu created FLINK-21250:
-

 Summary:  Failure to finalize checkpoint due to 
org.apache.hadoop.fs.FileAlreadyExistsException: 
/user/flink/checkpoints/9e36ed4ec2f6685f836e5ee5395f5f2e/chk-11096/_metadata 
for client xxx.xx.xx.xxx already exists
 Key: FLINK-21250
 URL: https://issues.apache.org/jira/browse/FLINK-21250
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.10.1
Reporter: Fangliang Liu


Flink Version :1.10.1

The following exception will occasionally be thrown when the flink job is 
running on yarn.
{code:java}
1 2021-02-03 15:36:51 level:WARN log:2021-02-03 15:36:51,762 
qujianpan_server_pbv2_kafka2hive WARN 
org.apache.flink.runtime.jobmaster.JobMaster - Error while processing 
checkpoint acknowledgement message 
location:org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:796)
 收起  throwable:org.apache.flink.runtime.checkpoint.CheckpointException: Could 
not finalize the pending checkpoint 11096. Failure reason: Failure to finalize 
checkpoint. at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:863)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:781)
 at 
org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:794)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) Caused by: 
org.apache.hadoop.fs.FileAlreadyExistsException: 
/user/flink/checkpoints/9e36ed4ec2f6685f836e5ee5395f5f2e/chk-11096/_metadata 
for client 172.16.190.74 already exists at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:3021)
 at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2908)
 at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2792)
 at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:615)
 at 
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.create(AuthorizationProviderProxyClientProtocol.java:117)
 at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:413)
 at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
 at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
 at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at 
org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2278) at 
org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2274) at 
java.security.AccessController.doPrivileged(Native Method) at 
javax.security.auth.Subject.doAs(Subject.java:422) at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
 at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2272) at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at 
org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
 at 
org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
 at 
org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1841)
 at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1698) at 
org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1633) at 
org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:448)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:444)
 at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:459)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:387)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) at 
org.apache.hadoop.fs.FileSystem.create(FileSy

Re: [DISCUSS] FLIP-152: Hive Query Syntax Compatibility

2021-02-03 Thread Rui Li
Thanks Godfrey & Jark for your comments!

Since both of you mentioned the naming of the parser factory, I'll answer
this question first.

I only intend to introduce the pluggable Parser for blink planner. So
the BlinkParserFactory will be added to the flink-table-planner-blink
module. We already have several factory classes there such as
"BlinkPlannerFactory" and "BlinkExecutorFactory". So it seems
"BlinkParserFactory" is following the naming convention of existing
classes. But I'm also fine with "ParserFactory" if we're going to remove
the "blink" prefix anyway.
For the concrete implementation, I prefer "DefaultParserFactory". Having a
"FlinkParserFactory" in blink planner seems a little weird. We can revisit
this when we really get rid of the legacy planner.

To answer the other questions:

@Godfrey
Yes I agree DDLs should also be handled by the HiveParser. That'll give
users more consistent experience.

The "Go Beyond Hive" section is mainly about decoupling the dialect from
hive objects, so that users can write HiveQL to query non-hive tables or
call non-hive functions. I think it's in the scope of this FLIP, but it
doesn't have to be done in the first release. I'll update the FLIP to be
more specific about it.

@Jark
Creating a new Parser only when dialect changes is a good idea. I'll update
the FLIP to mention that.

The 3.x new features we need to support in this FLIP include:

   1. PK & NOT NULL constraints
   2. Alter DB to change location

These DDLs are already covered by FLIP-123. Missing them will be a
regression.

Other new features can be identified with more tests and user feedback, and
will be supported incrementally.

By "we can use a newer version to support older versions", I mean syntax in
the new version is a superset of the old one. But we still need extra
efforts to make sure the copied code works with different hive versions,
e.g. more shim methods are required. So I'd rather start with a popular
version than the newest version.


On Wed, Feb 3, 2021 at 11:51 AM Jark Wu  wrote:

> Thanks Rui for the great proposal, I believe this can be very attractive
> for many Hive users.
>
> The FLIP looks good to me in general, I only have some minor comments:
>
> 1) BlinkParserFactory
> IIUC, BlinkParserFactory is located in the flink-table-api-java module with
> the Parser interface there.
> I suggest renaming it to `ParserFactory`, because it creates Parser instead
> of BlinkParser.
> And the implementations can be `HiveParserFactory` and
> `FlinkParserFactory`.
> I think we should avoid the `Blink` keyword in interfaces, blink planner is
> already the default planner and
> the old planner will be removed in the near future. There will be no
> `blink` in the future then.
>
> 2) "create a new instance each time getParser is called"
> Finding parser for every time getParser is called sounds heavy to me. I
> think we can improve this by simplify
> caching the Parser instance,  and creating a new one if current sql-dialect
> is different from the cached Parser.
>
> 3) Hive version
> How much code needs to be done to support new features in 3.x based on 2.x?
> Is this also included in this FLIP/release?
> I don't fully understand this because the FLIP says "we can use a newer
> version to support older versions."
>
> Best,
> Jark
>
> On Wed, 3 Feb 2021 at 11:48, godfrey he  wrote:
>
> > Thanks for bringing up the discussion, Rui!
> >
> > Regarding the DDL part in the "Introduce HiveParser" section,
> > I would like to choose the second option. Because if we could
> > use one hive parser to parse all hive SQLs, we need not to copy
> > Calcite parser code, and the framework and the code will be very simple.
> >
> > Regarding the "Go Beyond Hive" section, is that the scope of this FLIP ?
> > Could you list all the extensions and give some examples ?
> >
> > One minor suggestion about the name of ParserImplFactory.
> > How about renaming ParserImplFactory to DefaultParserFactory ?
> >
> > Best,
> > Godfrey
> >
> > Rui Li  于2021年2月3日周三 上午11:16写道:
> >
> > > Hi Jingsong,
> > >
> > > Thanks for your comments and they're very good questions.
> > >
> > > Regarding # Version, we need to do some tradeoff here. Choosing the
> > latest
> > > 3.x will cover all the features we want to support. But as you said,
> 3.x
> > > and 2.x can have some differences and requires more efforts to support
> > > lower versions. I decided to pick 2.x and evolve from there to support
> > new
> > > features in 3.x. Because I think most hive users, especially those who
> > are
> > > likely to be interested in this feature, are still using 2.x or even
> 1.x.
> > > So the priority is to cover 2.x and 1.x first.
> > >
> > > Regarding # Hive Codes, in my PoC, I simply copy the code and make as
> few
> > > changes as possible. I believe we can do some clean up or refactor to
> > > reduce it. With that in mind, I expect it to be over 10k lines of java
> > > code, and even more if we count ANTLR grammar files as well.
> > >
> > > Rega

[jira] [Created] (FLINK-21249) TableEnvironment does not support create method with remote nvironment

2021-02-03 Thread lidesheng (Jira)
lidesheng created FLINK-21249:
-

 Summary: TableEnvironment does not support create method with 
remote nvironment
 Key: FLINK-21249
 URL: https://issues.apache.org/jira/browse/FLINK-21249
 Project: Flink
  Issue Type: Wish
  Components: Table SQL / API
Affects Versions: 1.11.3
Reporter: lidesheng


I want to commit a SQL task to remote long-time session,but TableEnvironment 
has only one method "create(EnvironmentSettings settings)". In this method , it 
called create method of ExecutorFactory to create runtime environment. 
Furthermore,  I cannot new instance of TableEnvironment because the constructor 
of TableEnvironmentImpl is setted to protected. I have to create a new class 
inherited from TableEnvironmentImpl and rewirte to logic to create 
TableEnvironment with remote environment.

ExecutorFactory interface has two sub class: StreamExecutorFactory and 
BlinkExecutorFactory and both class support create(settting, env) / 
create(settings) methods while ExecutorFactory has create(settings) method only 
.

So, if create(settting, env) method is added to ExecutorFactory interface, 
TableEnvironment  can also use add create(settting, env) method to working with 
remote environment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21248) Checkpoint problem

2021-02-03 Thread Ceyhan Kasap (Jira)
Ceyhan Kasap created FLINK-21248:


 Summary: Checkpoint problem
 Key: FLINK-21248
 URL: https://issues.apache.org/jira/browse/FLINK-21248
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.12.0
Reporter: Ceyhan Kasap


Hi

I am trying to upgrade from 1.5.5 to 1.12 and checkpointing mechanism seems to 
be broken in our kafka connector sourced datastream jobs.

Since there is a siginificant version gap and there are many backwards 
uncompatible / deprecated changes in flink runtime between versions, I had to 
modify our jobs and noticed that checkpoint offsets are not committed to kafka 
for source connectors.

To simplfiy the issues I created simple repoducer projects:

[https://github.com/simpleusr/flink_problem_1.5.5]

[https://github.com/simpleusr/flink_problem_1.12.0]

It seems that there are majr changes in the checkpoint infrastructure.

For 1.5.5 checkpoint cycles works as expected as can be seen from the logs 
(please note that sample project contains a small hack in 
org.apache.flink.runtime.minicluster.MiniCluster which prevents cluster from 
stopping) :

*[2021-02-03 10:04:17,409] INFO Completed checkpoint 2 for job 
08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 11 ms). 
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*

*[2021-02-03 10:04:17,409] INFO Completed checkpoint 2 for job 
08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 11 ms). 
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*



*[2021-02-03 10:04:27,401] INFO Completed checkpoint 4 for job 
08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 5 ms). 
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*

*[2021-02-03 10:04:27,401] INFO Completed checkpoint 4 for job 
08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 5 ms). 
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*

However for 1.12.0 checkpoint cycles stuck at initial checkpoint:

*[2021-02-03 10:06:24,504] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ 
1612339584496 for job ce255b141393a358db734db2d27ef0ea. 
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*

As far as I see, checkpoint cycle is stuck at waiting in 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator for 
coordinatorCheckpointsComplete although coordinatorsToCheckpoint is empty...

 

{{final CompletableFuture coordinatorCheckpointsComplete =
pendingCheckpointCompletableFuture.thenComposeAsync(
(pendingCheckpoint) ->
OperatorCoordinatorCheckpoints

.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
coordinatorsToCheckpoint,
pendingCheckpoint,
timer),
timer);}}

Simply returning from 
OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion
 when there is no coordinatorsToCheckpoint seems to resolve the problem:

*[2021-02-03 10:07:53,387] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ 
1612339673380 for job ffb4a06302f7e60e9325f32340d299b2. 
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*

*[2021-02-03 10:07:53,387] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ 
1612339673380 for job ffb4a06302f7e60e9325f32340d299b2. 
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*

*[2021-02-03 10:07:53,607] INFO Completed checkpoint 1 for job 
ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 225 ms). 
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*

*[2021-02-03 10:07:53,607] INFO Completed checkpoint 1 for job 
ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 225 ms). 
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*

*[2021-02-03 10:07:58,380] INFO Triggering checkpoint 2 (type=CHECKPOINT) @ 
1612339678380 for job ffb4a06302f7e60e9325f32340d299b2. 
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*

*[2021-02-03 10:07:58,380] INFO Triggering checkpoint 2 (type=CHECKPOINT) @ 
1612339678380 for job ffb4a06302f7e60e9325f32340d299b2. 
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*

*[2021-02-03 10:07:58,388] INFO Completed checkpoint 2 for job 
ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 7 ms). 
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*

*[2021-02-03 10:07:58,388] INFO Completed checkpoint 2 for job 
ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 7 ms). 
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*

I have submitted this pr for this.

Please help me if I am missing something or there is another solution without 
code change.

We need to perform the upgrade and modify our jobs as soon 

Re: [VOTE] FLIP-156: Runtime Interfaces for Fine-Grained Resource Requirements

2021-02-03 Thread Zhu Zhu
+1 (binding)

Thanks,
Zhu

Yang Wang  于2021年2月2日周二 下午7:11写道:

> +1 (non-binding)
>
> Best,
> Yang
>
> Chesnay Schepler  于2021年2月2日周二 下午5:50写道:
>
> > +1
> >
> > On 2/2/2021 10:11 AM, Till Rohrmann wrote:
> > > +1 (binding)
> > >
> > > Cheers,
> > > Till
> > >
> > > On Mon, Feb 1, 2021 at 5:38 AM Xintong Song 
> > wrote:
> > >
> > >> +1 (binding)
> > >>
> > >> Thank you~
> > >>
> > >> Xintong Song
> > >>
> > >>
> > >>
> > >> On Mon, Feb 1, 2021 at 11:56 AM Yangze Guo 
> wrote:
> > >>
> > >>> Hi everyone,
> > >>>
> > >>> I'd like to start the vote of FLIP-156 [1]. This FLIP is discussed in
> > >>> the thread[2].
> > >>>
> > >>> The vote will be open for at least 72 hours. Unless there is an
> > >> objection,
> > >>> I will try to close it by February 4, 2021 if we have received
> > >>> sufficient votes.
> > >>>
> > >>> [1]
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-156%3A+Runtime+Interfaces+for+Fine-Grained+Resource+Requirements
> > >>> [2]
> > >>>
> > >>
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-156-Runtime-Interfaces-for-Fine-Grained-Resource-Requirements-td47650.html
> > >>> Best,
> > >>> Yangze Guo
> > >>>
> >
> >
>