[jira] [Commented] (FLINK-12089) Broadcasting Dynamic Configuration

2019-04-08 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813021#comment-16813021
 ] 

vinoyang commented on FLINK-12089:
--

[~langjingxiang] Why do you want this feature? I am interested in it. What's 
the benefit if the Job Manager schedules the confFunction? IMP, you should 
provide more motivation for your proposal.

> Broadcasting Dynamic Configuration
> --
>
> Key: FLINK-12089
> URL: https://issues.apache.org/jira/browse/FLINK-12089
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Scala, Runtime / Network
>Affects Versions: 1.7.2
>Reporter: langjingxiang
>Priority: Critical
>
> flink Broadcasting Dynamic Configuration now the way is:
> datastream :broadcasting stream to join 
> dataset: broadcasting dataset 
> but Intrusion to user business is relatively large ,
> I think we can design an API:
> evn.broadcast(confName,confFunction,SchedulingTime)
> dataStream.map(new RichMapFunction(){
> public void open(Configuration parameters) throws Exception {
> Objec broadcastSet = getRuntimeContext().getBroadcastVariable(confName);
> }})
>  
> Job Manager Schedule confFunction broadcast to Task Context
>  
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12125) Add OVH to poweredby.zh.md and index.zh.md

2019-04-08 Thread Ji Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ji Liu reassigned FLINK-12125:
--

Assignee: Ji Liu

> Add OVH to poweredby.zh.md and index.zh.md
> --
>
> Key: FLINK-12125
> URL: https://issues.apache.org/jira/browse/FLINK-12125
> Project: Flink
>  Issue Type: Task
>  Components: chinese-translation, Project Website
>Reporter: Fabian Hueske
>Assignee: Ji Liu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> OVH was added to the {{poweredby.md}} and {index.md}} pages in commits 
> 55ae4426d5b91695e1e5629b1d0a16b7a1e010f0 and 
> d4a160ab336c5ae1b2f772fbeff7e003478e274b. See also PR 
> https://github.com/apache/flink-web/pull/193.
> The corresponding Chinese pages should be updated accordingly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12125) Add OVH to poweredby.zh.md and index.zh.md

2019-04-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12125:
---
Labels: pull-request-available  (was: )

> Add OVH to poweredby.zh.md and index.zh.md
> --
>
> Key: FLINK-12125
> URL: https://issues.apache.org/jira/browse/FLINK-12125
> Project: Flink
>  Issue Type: Task
>  Components: chinese-translation, Project Website
>Reporter: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
>
> OVH was added to the {{poweredby.md}} and {index.md}} pages in commits 
> 55ae4426d5b91695e1e5629b1d0a16b7a1e010f0 and 
> d4a160ab336c5ae1b2f772fbeff7e003478e274b. See also PR 
> https://github.com/apache/flink-web/pull/193.
> The corresponding Chinese pages should be updated accordingly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-08 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r273327251
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A generic catalog implementation that holds all meta objects in memory.
+ */
+public class GenericInMemoryCatalog implements ReadableWritableCatalog {
 
 Review comment:
   Make sense to me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8125: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSourceTask

2019-04-08 Thread GitBox
flinkbot commented on issue #8125: [FLINK-10205] Batch Job: InputSplit Fault 
tolerant for DataSourceTask
URL: https://github.com/apache/flink/pull/8125#issuecomment-481103374
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Ryantaocer opened a new pull request #8125: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSourceTask

2019-04-08 Thread GitBox
Ryantaocer opened a new pull request #8125: [FLINK-10205] Batch Job: InputSplit 
Fault tolerant for DataSourceTask
URL: https://github.com/apache/flink/pull/8125
 
 
   (**This is on top of [RP-6684](https://github.com/apache/flink/pull/6684/) 
to address its review comments and conflicts with apache/flink master.**)
   
   
   
   ## What is the purpose of the change
   
   Today DataSource Task pull InputSplits from JobManager to achieve better 
performance, however, when a DataSourceTask failed and rerun, it will not get 
the same splits as its previous version. this will introduce inconsistent 
result or even data corruption.
   
   Furthermore,  if there are two executions run at the same time (in batch 
scenario), this two executions should process same splits.
   
   we need to fix the issue to make the inputs of a DataSourceTask 
deterministic. The propose is save all splits into ExecutionVertex and 
DataSourceTask will pull split from there.
   
   
   ## Brief change log
   
 - *JobMaster getNextInputSplit from Execution*
 - *Execution forward getNextInputSplit and the sequence number of the 
request to ExecutionVertex*
 - *If the sequence number exist in the ExecutionVertex return, else 
calculate and cache*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
  - *covered by existing test*
  - *Added a new test that validates the scenario that getNextInputSplit 
multiple times with different Execution attempts per ExecutionVertex*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangyang0918 commented on issue #7844: [FLINK-10932][ResourceManager] Initial flink-kubernetes module with e…

2019-04-08 Thread GitBox
wangyang0918 commented on issue #7844: [FLINK-10932][ResourceManager] Initial 
flink-kubernetes module with e…
URL: https://github.com/apache/flink/pull/7844#issuecomment-481102392
 
 
   Hi @tillrohrmann, could you help to get this PR committed so that the 
follow-up works could be started.
   We already have a kubernetes integration in blink and hope to get involved 
and make some contributions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12131) Resetting ExecutionVertex in region failover may cause inconsistency of IntermediateResult status

2019-04-08 Thread Zhu Zhu (JIRA)
Zhu Zhu created FLINK-12131:
---

 Summary: Resetting ExecutionVertex in region failover may cause 
inconsistency of IntermediateResult status
 Key: FLINK-12131
 URL: https://issues.apache.org/jira/browse/FLINK-12131
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.9.0
Reporter: Zhu Zhu
Assignee: Zhu Zhu


Currently the *IntermediateResult* status is only reset when its producer 
*ExecutionJobVertex* is reset.

 

When region failover strategy is enabled, the failed region vertices are reset 
through  *ExecutionVertex.resetForNewExecution()*. The 
*numberOfRunningProducers* counter in

IntermediateResult, however, is not properly adjusted in this case.

So if a FINISHED vertex is restarted and finishes again, the counter may drop 
below 0.

 

Besides, the consumable property of the partition is not reset as well. This 
may lead to incorrect input state check result for lazy scheduling.

 

I'd propose to invoke *IntermediateResultPartition.resetForNewExecution()* in 
*ExecutionVertex.resetForNewExecution()* and reset the 
*numberOfRunningProducers* counter and *IntermediateResultPartition* there.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12125) Add OVH to poweredby.zh.md and index.zh.md

2019-04-08 Thread Ji Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813003#comment-16813003
 ] 

Ji Liu commented on FLINK-12125:


Hi [~fhueske] Is there anyone work on this? if not, I would like to take it.

> Add OVH to poweredby.zh.md and index.zh.md
> --
>
> Key: FLINK-12125
> URL: https://issues.apache.org/jira/browse/FLINK-12125
> Project: Flink
>  Issue Type: Task
>  Components: chinese-translation, Project Website
>Reporter: Fabian Hueske
>Priority: Major
>
> OVH was added to the {{poweredby.md}} and {index.md}} pages in commits 
> 55ae4426d5b91695e1e5629b1d0a16b7a1e010f0 and 
> d4a160ab336c5ae1b2f772fbeff7e003478e274b. See also PR 
> https://github.com/apache/flink-web/pull/193.
> The corresponding Chinese pages should be updated accordingly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10049) Unify the processing logic for NULL arguments in SQL built-in functions

2019-04-08 Thread Xingcan Cui (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813002#comment-16813002
 ] 

Xingcan Cui commented on FLINK-10049:
-

Hey guys, thanks for the comments. I've not gone through all the documents, but 
it seems true that different SQL engines have different mechanisms for this 
problem. However, since all the fields in Flink SQL are nullable in the current 
version, simply throwing NPE and terminating the execution should always be 
avoided.

IMO, each UDF is responsible to handle {{NULL}} arguments itself, with the 
correct semantics. The {{NULL}} means unknown in SQL, and thus most scalar 
functions should output "unknown" with an unknown input. We can add the 
{{RETURNS NULL ON NULL INPUT}} option to UDF definitions (maybe a method to be 
overridden), but it works more like an optimization method, which means event 
without this declaration, the function should return "NULL" after being invoked 
(just in case).

Actually, there's no need to unify the processing logic. Just keep the correct 
semantics and avoid terminating the (continuous) queries unexpectedly. Thus, I 
plan to rename this ticket to "Correctly handle NULL arguments in SQL built-in 
functions". As for the exception handling mechanism, it's a little bit 
different and we'd better discuss it in another place.

What do you think?

> Unify the processing logic for NULL arguments in SQL built-in functions
> ---
>
> Key: FLINK-10049
> URL: https://issues.apache.org/jira/browse/FLINK-10049
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Xingcan Cui
>Assignee: vinoyang
>Priority: Major
>
> Currently, the built-in functions treat NULL arguments in different ways. 
> E.g., ABS(NULL) returns NULL, while LOG10(NULL) throws an NPE. The general 
> SQL-way of handling NULL values should be that if one argument is NULL the 
> result is NULL. We should unify the processing logic for that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] carp84 commented on a change in pull request #8122: [FLINK-12121] [State Backends] Use composition instead of inheritance for the InternalKeyContext logic in backend

2019-04-08 Thread GitBox
carp84 commented on a change in pull request #8122: [FLINK-12121] [State 
Backends] Use composition instead of inheritance for the InternalKeyContext 
logic in backend
URL: https://github.com/apache/flink/pull/8122#discussion_r273322899
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 ##
 @@ -175,8 +173,8 @@ public void dispose() {
@Override
public void setCurrentKey(K newKey) {
notifyKeySelected(newKey);
-   this.currentKey = newKey;
-   this.currentKeyGroup = 
KeyGroupRangeAssignment.assignToKeyGroup(newKey, numberOfKeyGroups);
+   this.keyContext.setCurrentKey(newKey);
 
 Review comment:
   Maybe keeping the setters/getters in pair is a better idea? Wdyt? Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #8122: [FLINK-12121] [State Backends] Use composition instead of inheritance for the InternalKeyContext logic in backend

2019-04-08 Thread GitBox
carp84 commented on a change in pull request #8122: [FLINK-12121] [State 
Backends] Use composition instead of inheritance for the InternalKeyContext 
logic in backend
URL: https://github.com/apache/flink/pull/8122#discussion_r273322040
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
 ##
 @@ -143,6 +143,7 @@ public Void restore() throws Exception {
}
 
keySerializerRestored = true;
+   
keyContext.setCurrentKeySerializer(keySerializerProvider.currentSchemaSerializer());
 
 Review comment:
   Because the serializer might be re-configured in 
`setPreviousSerializerSnapshotForRestoredState`, please check the 
[implementation](https://github.com/apache/flink/blob/3d5b6b166202c423b1529316bb9a9a755628ae1a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java#L335)
 in `StateSerializerProvider.EagerlyRegisteredStateSerializerProvider`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12130) Apply command line options to configuration before install security modules

2019-04-08 Thread Victor Wong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-12130:

Description: 
Currently if the user configures Kerberos credentials through command line, it 
won't work.
{code:java}
// flink run -m yarn-cluster -yD security.kerberos.login.keytab=/path/to/keytab 
-yD security.kerberos.login.principal=xxx /path/to/test.jar
{code}
Above command would cause security failure if you do not have a ticket cache w/ 
kinit.

Maybe we could call 
_org.apache.flink.client.cli.AbstractCustomCommandLine#applyCommandLineOptionsToConfiguration_
  before _SecurityUtils.install(new SecurityConfiguration(cli.configuration));_

Here is a demo patch: 
[https://github.com/jiasheng55/flink/commit/f2c63b97bdb7d6067deb7a48caf72958abb2903a]

  was:
Currently if the user configures Kerberos credentials through command line, it 
won't work.
{code:java}
// flink run -m yarn-cluster -yD security.kerberos.login.keytab=/path/to/keytab 
-yD security.kerberos.login.principal=xxx /path/to/test.jar
{code}
Maybe we could call 
_org.apache.flink.client.cli.AbstractCustomCommandLine#applyCommandLineOptionsToConfiguration_
 ** before _SecurityUtils.install(new 
SecurityConfiguration(cli.configuration));_

Here is a demo patch: 
[https://github.com/jiasheng55/flink/commit/f2c63b97bdb7d6067deb7a48caf72958abb2903a]


> Apply command line options to configuration before install security modules
> ---
>
> Key: FLINK-12130
> URL: https://issues.apache.org/jira/browse/FLINK-12130
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Reporter: Victor Wong
>Priority: Major
>
> Currently if the user configures Kerberos credentials through command line, 
> it won't work.
> {code:java}
> // flink run -m yarn-cluster -yD 
> security.kerberos.login.keytab=/path/to/keytab -yD 
> security.kerberos.login.principal=xxx /path/to/test.jar
> {code}
> Above command would cause security failure if you do not have a ticket cache 
> w/ kinit.
> Maybe we could call 
> _org.apache.flink.client.cli.AbstractCustomCommandLine#applyCommandLineOptionsToConfiguration_
>   before _SecurityUtils.install(new 
> SecurityConfiguration(cli.configuration));_
> Here is a demo patch: 
> [https://github.com/jiasheng55/flink/commit/f2c63b97bdb7d6067deb7a48caf72958abb2903a]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12130) Apply command line options to configuration before install security modules

2019-04-08 Thread Victor Wong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-12130:

Description: 
Currently if the user configures Kerberos credentials through command line, it 
won't work.
{code:java}
// flink run -m yarn-cluster -yD security.kerberos.login.keytab=/path/to/keytab 
-yD security.kerberos.login.principal=xxx /path/to/test.jar
{code}
Maybe we could call 
_org.apache.flink.client.cli.AbstractCustomCommandLine#applyCommandLineOptionsToConfiguration_
 ** before _SecurityUtils.install(new 
SecurityConfiguration(cli.configuration));_

Here is a demo patch: 
[https://github.com/jiasheng55/flink/commit/f2c63b97bdb7d6067deb7a48caf72958abb2903a]

  was:
Currently if the user configures Kerberos credentials through command line, it 
won't work.
{code:java}
// flink run -m yarn-cluster -yD security.kerberos.login.keytab=/path/to/keytab 
-yD security.kerberos.login.principal=xxx /path/to/test.jar
{code}
Maybe we could call 
_org.apache.flink.client.cli.AbstractCustomCommandLine#applyCommandLineOptionsToConfiguration_
 ** before _SecurityUtils.install(new 
SecurityConfiguration(cli.configuration));_

Here is a demo path: 
[https://github.com/jiasheng55/flink/commit/f2c63b97bdb7d6067deb7a48caf72958abb2903a]


> Apply command line options to configuration before install security modules
> ---
>
> Key: FLINK-12130
> URL: https://issues.apache.org/jira/browse/FLINK-12130
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Reporter: Victor Wong
>Priority: Major
>
> Currently if the user configures Kerberos credentials through command line, 
> it won't work.
> {code:java}
> // flink run -m yarn-cluster -yD 
> security.kerberos.login.keytab=/path/to/keytab -yD 
> security.kerberos.login.principal=xxx /path/to/test.jar
> {code}
> Maybe we could call 
> _org.apache.flink.client.cli.AbstractCustomCommandLine#applyCommandLineOptionsToConfiguration_
>  ** before _SecurityUtils.install(new 
> SecurityConfiguration(cli.configuration));_
> Here is a demo patch: 
> [https://github.com/jiasheng55/flink/commit/f2c63b97bdb7d6067deb7a48caf72958abb2903a]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12130) Apply command line options to configuration before install security modules

2019-04-08 Thread Victor Wong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-12130:

Description: 
Currently if the user configures Kerberos credentials through command line, it 
won't work.
{code:java}
// flink run -m yarn-cluster -yD security.kerberos.login.keytab=/path/to/keytab 
-yD security.kerberos.login.principal=xxx /path/to/test.jar
{code}
Maybe we could call 
_org.apache.flink.client.cli.AbstractCustomCommandLine#applyCommandLineOptionsToConfiguration_
 ** before _SecurityUtils.install(new 
SecurityConfiguration(cli.configuration));_

Here is a demo path: 
[https://github.com/jiasheng55/flink/commit/f2c63b97bdb7d6067deb7a48caf72958abb2903a]

  was:
Currently if the user configures Kerberos credentials through command line, it 
won't work.
{code:java}
// flink run -m yarn-cluster -yD security.kerberos.login.keytab=/path/to/keytab 
-yD security.kerberos.login.principal=xxx /path/to/test.jar
{code}
Maybe we could call 
_org.apache.flink.client.cli.AbstractCustomCommandLine#applyCommandLineOptionsToConfiguration_
 ** before _SecurityUtils.install(new 
SecurityConfiguration(cli.configuration));_


> Apply command line options to configuration before install security modules
> ---
>
> Key: FLINK-12130
> URL: https://issues.apache.org/jira/browse/FLINK-12130
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Reporter: Victor Wong
>Priority: Major
>
> Currently if the user configures Kerberos credentials through command line, 
> it won't work.
> {code:java}
> // flink run -m yarn-cluster -yD 
> security.kerberos.login.keytab=/path/to/keytab -yD 
> security.kerberos.login.principal=xxx /path/to/test.jar
> {code}
> Maybe we could call 
> _org.apache.flink.client.cli.AbstractCustomCommandLine#applyCommandLineOptionsToConfiguration_
>  ** before _SecurityUtils.install(new 
> SecurityConfiguration(cli.configuration));_
> Here is a demo path: 
> [https://github.com/jiasheng55/flink/commit/f2c63b97bdb7d6067deb7a48caf72958abb2903a]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12130) Apply command line options to configuration before install security modules

2019-04-08 Thread Victor Wong (JIRA)
Victor Wong created FLINK-12130:
---

 Summary: Apply command line options to configuration before 
install security modules
 Key: FLINK-12130
 URL: https://issues.apache.org/jira/browse/FLINK-12130
 Project: Flink
  Issue Type: Improvement
  Components: Command Line Client
Reporter: Victor Wong


Currently if the user configures Kerberos credentials through command line, it 
won't work.
{code:java}
// flink run -m yarn-cluster -yD security.kerberos.login.keytab=/path/to/keytab 
-yD security.kerberos.login.principal=xxx /path/to/test.jar
{code}
Maybe we could call 
_org.apache.flink.client.cli.AbstractCustomCommandLine#applyCommandLineOptionsToConfiguration_
 ** before _SecurityUtils.install(new 
SecurityConfiguration(cli.configuration));_



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-08 Thread GitBox
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r273318302
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
 
 Review comment:
   After thinking through this issue again, if we keep maintaining 
`isCreditBased` in `NetworkEnvironmentConfiguration`, we have to confirm this 
property consistent between `NetworkEnvironmentConfiguration` and Flink 
`Configuration` in `NettyConfig`. So the codes seem ugly in 
`NetworkEnvironmentConfigurationBuilder`. Then atm I would prefer to add this 
property in the constructor of 

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-08 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r273318161
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A generic catalog database implementation.
+ */
+public class GenericCatalogDatabase implements CatalogDatabase {
 
 Review comment:
   I think it's better to overwrite. But I am fine if you do not want to do it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-08 Thread GitBox
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r273307816
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
 
 Review comment:
   I even considered the way of adding one more parameter `isCreditBased` in 
`NettyConnectionManager`. But `TaskManagerOptions.NETWORK_CREDIT_MODEL` is 
`Deprecated`, and it is supposed to be removed from FLINK-1.6. AFAIK, this 
temporary option would be abandoned finally after one undergoing network issue 
confirmed. So it might be not good to reflect it in the constructor of 
`NettyConnectionManager`  

[jira] [Assigned] (FLINK-3991) Remove deprecated configuration keys from ConfigConstants

2019-04-08 Thread Ji Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-3991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ji Liu reassigned FLINK-3991:
-

Assignee: Ji Liu

> Remove deprecated configuration keys from ConfigConstants
> -
>
> Key: FLINK-3991
> URL: https://issues.apache.org/jira/browse/FLINK-3991
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Reporter: Robert Metzger
>Assignee: Ji Liu
>Priority: Major
> Fix For: 2.0.0
>
>
> In 
> https://github.com/apache/flink/commit/b0acd97935cd21843bac3b9b5afa3662b52bb95d#diff-40616c4678c3fbfe07c0701505ce0567
>  I deprecated some configuration keys.
> They are unused and need to be removed with the 2.0 release.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9787) Change ExecutionConfig#getGlobalJobParameters to return an instance of GlobalJobParameters instead of null if no custom globalJobParameters are set yet

2019-04-08 Thread Ji Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ji Liu reassigned FLINK-9787:
-

Assignee: Ji Liu

> Change ExecutionConfig#getGlobalJobParameters to return an instance of 
> GlobalJobParameters instead of null if no custom globalJobParameters are set 
> yet
> ---
>
> Key: FLINK-9787
> URL: https://issues.apache.org/jira/browse/FLINK-9787
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Florian Schmidt
>Assignee: Ji Liu
>Priority: Minor
> Fix For: 2.0.0
>
>
> Currently when accessing ExecutionConfig#getGlobalJobParameters this will 
> return `null` if no globalJobParameters are set. This can easily lead to 
> NullPointerExceptions when used with getGlobalJobParameters.toMap()
> An easy improvement for this would be to just return a new instance of 
> GlobalJobParameters if none is set with the empty map as the parameters
> This would be a breaking change since we expose this via 
> RuntimeContext#getExecutionConfig



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-7916) Remove NetworkStackThroughputITCase

2019-04-08 Thread Ji Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ji Liu reassigned FLINK-7916:
-

Assignee: Ji Liu

> Remove NetworkStackThroughputITCase
> ---
>
> Key: FLINK-7916
> URL: https://issues.apache.org/jira/browse/FLINK-7916
> Project: Flink
>  Issue Type: Task
>  Components: Runtime / Network, Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Ji Liu
>Priority: Major
>
> Flink's code base contains the {{NetworkStackThroughputITCase}} which is not 
> really a test. Moreover it is marked as {{Ignored}}. I propose to remove this 
> test because it is more of a benchmark. We could think about creating a 
> benchmark project where we move these kind of "tests".
> In general I think we should remove ignored tests if they won't be fixed 
> immediately. The danger is far too high that we forget about them and then we 
> only keep the maintenance burden of it. This is especially true for the above 
> mentioned test case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12084) Flink API extensions do not seems to include split method

2019-04-08 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16812977#comment-16812977
 ] 

vinoyang commented on FLINK-12084:
--

[~Jac] Considering the {{split}} API has been marked as {{Deprecated}} . I 
don't know whether it's valuable or not. cc [~aljoscha] What's your opinion?

> Flink API extensions do not seems to include split method
> -
>
> Key: FLINK-12084
> URL: https://issues.apache.org/jira/browse/FLINK-12084
> Project: Flink
>  Issue Type: Wish
>  Components: API / Scala
>Affects Versions: 1.7.2
>Reporter: Jacopo Gobbi
>Priority: Minor
>
> In the list of Scala API extensions Flink does not seems to provide a method 
> for the split function to be deconstructed using partial function.
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/scala_api_extensions.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12080) Add timestamp property to savepoint

2019-04-08 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16812973#comment-16812973
 ] 

vinoyang commented on FLINK-12080:
--

[~yunta] The proposal about adding a timestamp for Savepoint sounds valuable, I 
think it's a good metadata property.

> Add timestamp property to savepoint
> ---
>
> Key: FLINK-12080
> URL: https://issues.apache.org/jira/browse/FLINK-12080
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: jeremy huang
>Priority: Major
>
> Savepoint don't have timestamp property, and when restoring from savepoint we 
> can't get the timestamp message from LOG. I wonder if we can add timestamp to 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned

2019-04-08 Thread yankai zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16812967#comment-16812967
 ] 

yankai zhang edited comment on FLINK-12113 at 4/9/19 3:20 AM:
--

Yes, _fromCollection(Iterator, Class)_ works well as expected without anonymous 
class.

Problem here is anonymous class object in instance method implicitly references 
outer _this_(but not actually used), while outer _this_ is not serializable, 
and this is exactly what _StreamExecutionEnvironment#clean_ supposed to do.

In fact, the iterator passed by user is wrapped within a 
_FromIteratorFunction_, and then _StreamExecutionEnvironment#clean_ is called 
on that wrapper _ _instance, not the iterator itself. However current 
implementation of _StreamExecutionEnvironment#clean_ is not recursive, it can't 
find and clean _this_ deeply nested in closure.

Here is my fully reproducible code:
{code:java}
public class MainTest {


interface IS extends Iterator, Serializable {
}

@Test
public void cleanTest() {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.fromCollection(new IS() {
@Override
public boolean hasNext() {
return false;
}

@Override
public Object next() {
return null;
}
}, Object.class);
}
}{code}


was (Author: vision57):
Yes, _fromCollection(Iterator, Class)_ works well as expected without anonymous 
class.

Problem here is anonymous class object in instance method implicitly references 
outer _this_(but not actually used), while outer _this_ is not serializable, 
and this is exactly what _StreamExecutionEnvironment#clean_ supposed to do.

In act, the iterator passed by user is wrapped within a _FromIteratorFunction_, 
and then _StreamExecutionEnvironment#clean_ is called on that wrapper __ 
instance, not the iterator itself. However current implementation of 
_StreamExecutionEnvironment#clean_ is not recursive, it can't find and clean 
_this_ deeply nested in closure.

Here is my fully reproducible code:
{code:java}
public class MainTest {


interface IS extends Iterator, Serializable {
}

@Test
public void cleanTest() {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.fromCollection(new IS() {
@Override
public boolean hasNext() {
return false;
}

@Override
public Object next() {
return null;
}
}, Object.class);
}
}{code}

> User code passing to fromCollection(Iterator, Class) not cleaned
> 
>
> Key: FLINK-12113
> URL: https://issues.apache.org/jira/browse/FLINK-12113
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.7.2
>Reporter: yankai zhang
>Priority: Major
> Attachments: image-2019-04-07-21-52-37-264.png, 
> image-2019-04-08-23-19-27-359.png
>
>
>  
> {code:java}
> interface IS extends Iterator, Serializable { }
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromCollection(new IS() {
> @Override
> public boolean hasNext() {
> return false;
> }
> @Override
> public Object next() {
> return null;
> }
> }, Object.class);
> {code}
> Code piece above throws exception:
> {code:java}
> org.apache.flink.api.common.InvalidProgramException: The implementation of 
> the SourceFunction is not serializable. The object probably contains or 
> references non serializable fields.
>   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
> {code}
> And my workaround is wrapping clean around iterator instance, like this:
>  
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromCollection(env.clean(new IS() {
> @Override
> public boolean hasNext() {
> return false;
> }
> @Override
> public Object next() {
> return null;
> }
> }), Object.class);
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned

2019-04-08 Thread yankai zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16812967#comment-16812967
 ] 

yankai zhang commented on FLINK-12113:
--

Yes, _fromCollection(Iterator, Class)_ works well as expected without anonymous 
class.

Problem here is anonymous class object in instance method implicitly references 
outer _this_(but not actually used), while outer _this_ is not serializable, 
and this is exactly what _StreamExecutionEnvironment#clean_ supposed to do.

In act, the iterator passed by user is wrapped within a _FromIteratorFunction_, 
and then _StreamExecutionEnvironment#clean_ is called on that wrapper __ 
instance, not the iterator itself. However current implementation of 
_StreamExecutionEnvironment#clean_ is not recursive, it can't find and clean 
_this_ deeply nested in closure.

Here is my fully reproducible code:
{code:java}
public class MainTest {


interface IS extends Iterator, Serializable {
}

@Test
public void cleanTest() {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.fromCollection(new IS() {
@Override
public boolean hasNext() {
return false;
}

@Override
public Object next() {
return null;
}
}, Object.class);
}
}{code}

> User code passing to fromCollection(Iterator, Class) not cleaned
> 
>
> Key: FLINK-12113
> URL: https://issues.apache.org/jira/browse/FLINK-12113
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.7.2
>Reporter: yankai zhang
>Priority: Major
> Attachments: image-2019-04-07-21-52-37-264.png, 
> image-2019-04-08-23-19-27-359.png
>
>
>  
> {code:java}
> interface IS extends Iterator, Serializable { }
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromCollection(new IS() {
> @Override
> public boolean hasNext() {
> return false;
> }
> @Override
> public Object next() {
> return null;
> }
> }, Object.class);
> {code}
> Code piece above throws exception:
> {code:java}
> org.apache.flink.api.common.InvalidProgramException: The implementation of 
> the SourceFunction is not serializable. The object probably contains or 
> references non serializable fields.
>   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
> {code}
> And my workaround is wrapping clean around iterator instance, like this:
>  
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromCollection(env.clean(new IS() {
> @Override
> public boolean hasNext() {
> return false;
> }
> @Override
> public Object next() {
> return null;
> }
> }), Object.class);
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12009) Wrong check message about heartbeat interval for HeartbeatServices

2019-04-08 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng closed FLINK-12009.
---
   Resolution: Fixed
Fix Version/s: 1.8.1
   1.9.0
   1.7.3

Fixed in master: dca64c20d9524bdcd4d8d6d399639a7bbd1e5ccb
Fixed in release-1.8: 03d0a398e3402e7566c1a396586e7fff9414e2e9
Fixed in release-1.7: 5c4ed0e0a44017701229b6063ff8203b138757d5

> Wrong check message about heartbeat interval for HeartbeatServices
> --
>
> Key: FLINK-12009
> URL: https://issues.apache.org/jira/browse/FLINK-12009
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.7.2
>Reporter: Bruno Aranda
>Assignee: vinoyang
>Priority: Trivial
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> I am seeing:
> {code:java}
> The heartbeat timeout should be larger or equal than the heartbeat 
> timeout{code}
> due to bad configuration. I guess it should be instead:
> {code:java}
> The heartbeat interval should be larger or equal than the heartbeat 
> timeout{code}
> at:
> https://github.com/apache/flink/blob/1f0e036bbf6a37bb83623fb62d4900d7c28a5e1d/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatServices.java#L43



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4785) Flink string parser doesn't handle string fields containing two consecutive double quotes

2019-04-08 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16812955#comment-16812955
 ] 

Liya Fan commented on FLINK-4785:
-

[~f.pompermaier] Thanks for the information. I can also access it now.

I think this can be a known issue. Flink does not support the standard CSV file 
format, as specified by the RFC specification. 

So it does not process some special case very well, like comma in quotes, 
double quotes, etc. 

For more details, please see https://issues.apache.org/jira/browse/FLINK-10684

> Flink string parser doesn't handle string fields containing two consecutive 
> double quotes
> -
>
> Key: FLINK-4785
> URL: https://issues.apache.org/jira/browse/FLINK-4785
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataSet
>Affects Versions: 1.1.2
>Reporter: Flavio Pompermaier
>Priority: Major
>  Labels: csv
>
> To reproduce the error run 
> https://github.com/okkam-it/flink-examples/blob/master/src/main/java/it/okkam/flink/Csv2RowExample.java



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-08 Thread GitBox
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r273307816
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
 
 Review comment:
   I even considered the way of adding one more parameter `isCreditBased` in 
`NettyConnectionManager`. But `TaskManagerOptions.NETWORK_CREDIT_MODEL` is 
`Deprecated`, and it is supposed to be removed from FLINK-1.6. I think this 
temporary option would be abandoned finally after one undergoing network issue 
confirmed. So it might be not good to reflect it in the constructor of 
`NettyConnectionManager`  

[GitHub] [flink] yanghua commented on issue #8055: [FLINK-12024] Bump universal Kafka connector to Kafka dependency to 2.2.0

2019-04-08 Thread GitBox
yanghua commented on issue #8055: [FLINK-12024] Bump universal Kafka connector 
to Kafka dependency to 2.2.0
URL: https://github.com/apache/flink/pull/8055#issuecomment-481080653
 
 
   @pnowojski Thanks for pointing these version number. I have upgraded them to 
`2.2.0` except :
   
`flink-filesystems/flink-fs-hadoop-shaded/src/main/resources/META-INF/NOTICE:Azure
 Data Lake Store - Java client SDK 2.0.11`.
   
   Moreover, I found the first arg `KAFKA_CONNECTOR_VERSION` in 
`test_sql_client_kafka_common.sh` seems useless. I suggest that we should 
remove them in another  PR in the future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit closed pull request #8048: [FLINK-12009] Fix wrong check message about heartbeat interval for HeartbeatServices

2019-04-08 Thread GitBox
asfgit closed pull request #8048: [FLINK-12009] Fix wrong check message about 
heartbeat interval for HeartbeatServices
URL: https://github.com/apache/flink/pull/8048
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface

2019-04-08 Thread GitBox
flinkbot commented on issue #8124: [FLINK-11877] Implement the runtime handling 
of the InputSelectable interface
URL: https://github.com/apache/flink/pull/8124#issuecomment-481080032
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12129) CompressedViews need release heap buffers to reduce memory usage in blink

2019-04-08 Thread Jingsong Lee (JIRA)
Jingsong Lee created FLINK-12129:


 Summary: CompressedViews need release heap buffers to reduce 
memory usage in blink
 Key: FLINK-12129
 URL: https://issues.apache.org/jira/browse/FLINK-12129
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Runtime
Reporter: Jingsong Lee


In BinaryHashPartition, CompressedViews will be maintained for a long time. The 
heap buffers in the view that ends spill is useless and should be released.

see CompressedBlockChannelWriter.compressedBuffers and etc..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10725) Support for Java 11 (LTS)

2019-04-08 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16812950#comment-16812950
 ] 

Liya Fan commented on FLINK-10725:
--

[~aljoscha] Thanks a lot for the information. 

[~Zentol] If there is anything I can do, please feel free to contact me.

> Support for Java 11 (LTS)
> -
>
> Key: FLINK-10725
> URL: https://issues.apache.org/jira/browse/FLINK-10725
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.8.0, 2.0.0
>Reporter: Sina Madani
>Priority: Major
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Java 8 is over 5 years old and will be end of life in 2019/2020. Java 11, the 
> latest long-term support release, became GA in September 2018. Given that 
> FLINK-8033 still hasn't been resolved and that Java 9 was end of life 
> (discontinued / no longer publically available or supported) since March 
> 2018, it doesn't make sense to continue trying to add Java 9 support when 
> both Java 9 and Java 10 are end-of-life.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunhaibotb opened a new pull request #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface

2019-04-08 Thread GitBox
sunhaibotb opened a new pull request #8124: [FLINK-11877] Implement the runtime 
handling of the InputSelectable interface
URL: https://github.com/apache/flink/pull/8124
 
 
   ## What is the purpose of the change
   
   This pull request makes the Flink runtime to su do not throw a 
"java.lang.OutOfMemoryError: Direct Buffer Memory" error In the case of loading 
a offloaded TDD with large size. The loading uses NIO's Files.readAllBytes() to 
read serialized TDD, and in the call stack of Files.readAllBytes() , it will 
allocate a direct memory buffer which's size is equal to the length of the 
file. This will cause OutOfMemoryError when the file is very large.
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11877) Implement the runtime handling of the InputSelectable interface

2019-04-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11877:
---
Labels: pull-request-available  (was: )

> Implement the runtime handling of the InputSelectable interface
> ---
>
> Key: FLINK-11877
> URL: https://issues.apache.org/jira/browse/FLINK-11877
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Operators
>Reporter: Haibo Sun
>Assignee: Haibo Sun
>Priority: Major
>  Labels: pull-request-available
>
> - Introduces a new class `Input` to represent the logical input of operators.
>  - Introduces a new class `StreamTwoInputSelectableProcessor` to implement 
> selectively reading.
>  - Adds benchmarks for `StreamTwoInputProcessor` and 
> `StreamTwoInputSelectableProcessor` to ensure that 
> StreamTwoInputSelectableProcessor's throughput is the same or the regression 
> is acceptable in the case of constant `ALL`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot edited a comment on issue #8048: [FLINK-12009] Fix wrong check message about heartbeat interval for HeartbeatServices

2019-04-08 Thread GitBox
flinkbot edited a comment on issue #8048: [FLINK-12009] Fix wrong check message 
about heartbeat interval for HeartbeatServices
URL: https://github.com/apache/flink/pull/8048#issuecomment-476489178
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @sunjincheng121 [committer]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @sunjincheng121 [committer]
   * ❓ 3. Needs [attention] from.
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @sunjincheng121 [committer]
   * ✅ 5. Overall code [quality] is good.
   - Approved by @sunjincheng121 [committer]
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8048: [FLINK-12009] Fix wrong check message about heartbeat interval for HeartbeatServices

2019-04-08 Thread GitBox
sunjincheng121 commented on issue #8048: [FLINK-12009] Fix wrong check message 
about heartbeat interval for HeartbeatServices
URL: https://github.com/apache/flink/pull/8048#issuecomment-481078634
 
 
   Thank you for participating in 2019 flink forward, I believe that there is a 
great gain. @yanghua!
   
   Merging...
   
   @flinkbot approve all
   Best,
   Jincheng


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11879) Add JobGraph validators for the uses of InputSelectable, BoundedOneInput and BoundedMultiInput

2019-04-08 Thread Haibo Sun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haibo Sun updated FLINK-11879:
--
Summary: Add JobGraph validators for the uses of InputSelectable, 
BoundedOneInput and BoundedMultiInput  (was: Add JobGraph validators for the 
uses of TwoInputSelectable, BoundedOneInput and BoundedTwoInput)

> Add JobGraph validators for the uses of InputSelectable, BoundedOneInput and 
> BoundedMultiInput
> --
>
> Key: FLINK-11879
> URL: https://issues.apache.org/jira/browse/FLINK-11879
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Operators
>Reporter: Haibo Sun
>Assignee: Haibo Sun
>Priority: Major
>
> - Rejects the jobs containing operators which were implemented 
> `TwoInputSelectable` in case of enabled checkpointing.
>  - Rejects the jobs containing operators which were implemented 
> `BoundedInput` or `BoundedTwoInput` in case of enabled checkpointing.
>  - Rejects the jobs containing operators which were implemented 
> `TwoInputSelectable` in case that credit-based flow control is disabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11878) Implement the runtime handling of BoundedOneInput and BoundedMultiInput

2019-04-08 Thread Haibo Sun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haibo Sun updated FLINK-11878:
--
Summary: Implement the runtime handling of BoundedOneInput and 
BoundedMultiInput  (was: Implement the runtime handling of BoundedOneInput and 
BoundedTwoInput)

> Implement the runtime handling of BoundedOneInput and BoundedMultiInput
> ---
>
> Key: FLINK-11878
> URL: https://issues.apache.org/jira/browse/FLINK-11878
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Operators
>Reporter: Haibo Sun
>Assignee: Haibo Sun
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] yanghua commented on issue #8048: [FLINK-12009] Fix wrong check message about heartbeat interval for HeartbeatServices

2019-04-08 Thread GitBox
yanghua commented on issue #8048: [FLINK-12009] Fix wrong check message about 
heartbeat interval for HeartbeatServices
URL: https://github.com/apache/flink/pull/8048#issuecomment-481077182
 
 
   @azagrebin @sunjincheng121 Thanks for pointing the mistake and sorry for the 
late reply, I attended the Flink Forward SF 2019 and just come back today. I 
have updated the PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tianchen92 commented on issue #8121: [FLINK-10437]Some of keys under withDeprecatedKeys aren't marked as @…

2019-04-08 Thread GitBox
tianchen92 commented on issue #8121: [FLINK-10437]Some of keys under 
withDeprecatedKeys aren't marked as @…
URL: https://github.com/apache/flink/pull/8121#issuecomment-481077070
 
 
   hi, @rmetzger could you please help review this PR? thx!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11877) Implement the runtime handling of the InputSelectable interface

2019-04-08 Thread Haibo Sun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haibo Sun updated FLINK-11877:
--
Summary: Implement the runtime handling of the InputSelectable interface  
(was: Implement the runtime handling of TwoInputSelectable)

> Implement the runtime handling of the InputSelectable interface
> ---
>
> Key: FLINK-11877
> URL: https://issues.apache.org/jira/browse/FLINK-11877
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Operators
>Reporter: Haibo Sun
>Assignee: Haibo Sun
>Priority: Major
>
> - Introduces a new class `Input` to represent the logical input of operators.
>  - Introduces a new class `StreamTwoInputSelectableProcessor` to implement 
> selectively reading.
>  - Adds benchmarks for `StreamTwoInputProcessor` and 
> `StreamTwoInputSelectableProcessor` to ensure that 
> StreamTwoInputSelectableProcessor's throughput is the same or the regression 
> is acceptable in the case of constant `ALL`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-08 Thread GitBox
JingsongLi commented on a change in pull request #8102: 
[FLINK-12087][table-runtime-blink] Introduce over window operators to blink 
batch
URL: https://github.com/apache/flink/pull/8102#discussion_r273303176
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/over/frame/RowSlidingOverFrame.java
 ##
 @@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate.over.frame;
+
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.util.ResettableExternalBuffer;
+import org.apache.flink.table.type.RowType;
+
+/**
+ * The sliding window frame calculates frames with the following SQL form:
+ * ... ROW BETWEEN 1 PRECEDING AND 1 FOLLOWING
+ */
+public class RowSlidingOverFrame extends SlidingOverFrame {
+
+   private final int leftOffset;
+   private final int rightOffset;
+
+   /**
+* Index of the first input row with a value greater than the upper 
bound of the current
+* output row.
+*/
+   private int inputHighIndex = 0;
+
+   /**
+* Index of the first input row with a value equal to or greater than 
the lower bound of the
+* current output row.
+*/
+   private int inputLowIndex = 0;
+
+   public RowSlidingOverFrame(
+   RowType inputType,
+   RowType valueType,
+   GeneratedAggsHandleFunction aggsHandleFunction,
+   int leftOffset,
+   int rightOffset) {
+   super(inputType, valueType, aggsHandleFunction);
+   this.leftOffset = leftOffset;
+   this.rightOffset = rightOffset;
+   }
+
+   @Override
+   public void prepare(ResettableExternalBuffer rows) throws Exception {
+   super.prepare(rows);
+   inputHighIndex = 0;
+   inputLowIndex = 0;
+   }
+
+   @Override
+   public BaseRow process(int index, BaseRow current) throws Exception {
+   boolean bufferUpdated = index == 0;
+
+   // Drop all rows from the buffer for which the input row value 
is smaller than
+   // the output row lower bound.
+   while (!buffer.isEmpty() && inputLowIndex < index + leftOffset) 
{
 
 Review comment:
   leftOffset is negative value.
   I will change it to positive value to better understood.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-08 Thread GitBox
JingsongLi commented on a change in pull request #8102: 
[FLINK-12087][table-runtime-blink] Introduce over window operators to blink 
batch
URL: https://github.com/apache/flink/pull/8102#discussion_r273303052
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/over/frame/RowSlidingOverFrame.java
 ##
 @@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate.over.frame;
+
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.util.ResettableExternalBuffer;
+import org.apache.flink.table.type.RowType;
+
+/**
+ * The sliding window frame calculates frames with the following SQL form:
+ * ... ROW BETWEEN 1 PRECEDING AND 1 FOLLOWING
+ */
+public class RowSlidingOverFrame extends SlidingOverFrame {
+
+   private final int leftOffset;
+   private final int rightOffset;
+
+   /**
+* Index of the first input row with a value greater than the upper 
bound of the current
 
 Review comment:
   wrong comment


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-08 Thread GitBox
JingsongLi commented on a change in pull request #8102: 
[FLINK-12087][table-runtime-blink] Introduce over window operators to blink 
batch
URL: https://github.com/apache/flink/pull/8102#discussion_r273303013
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/over/frame/RowSlidingOverFrame.java
 ##
 @@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate.over.frame;
+
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.util.ResettableExternalBuffer;
+import org.apache.flink.table.type.RowType;
+
+/**
+ * The sliding window frame calculates frames with the following SQL form:
+ * ... ROW BETWEEN 1 PRECEDING AND 1 FOLLOWING
 
 Review comment:
   I will add detail comment


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10049) Unify the processing logic for NULL arguments in SQL built-in functions

2019-04-08 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16812935#comment-16812935
 ] 

Hequn Cheng commented on FLINK-10049:
-

[~xccui] Thanks for opening the issue. Good catch and +1 to unify the 
processing logic.

This is an interesting topic and I would like to share some thoughts too. I 
think it's a big topic for the improvement of UDF which deserves a discussion 
by itself.

As for the null input problem raised by this issue, I have come up with two 
problems to be addressed.
 - How to handle Exceptions for UDF.
We may need a global configuration to control the behavior of handling 
exceptions for UDF. For example, providing a return null option.
 - How to handle NULL input and NULL output.
We can let the UDF process null input by default. However, we can also provide 
options like SqlServer, i.e., {{RETURNS NULL ON NULL INPUT}}, return NULL when 
any of the arguments it receives is NULL, without actually invoking the body of 
the function.

What do you guys think?
 

> Unify the processing logic for NULL arguments in SQL built-in functions
> ---
>
> Key: FLINK-10049
> URL: https://issues.apache.org/jira/browse/FLINK-10049
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Xingcan Cui
>Assignee: vinoyang
>Priority: Major
>
> Currently, the built-in functions treat NULL arguments in different ways. 
> E.g., ABS(NULL) returns NULL, while LOG10(NULL) throws an NPE. The general 
> SQL-way of handling NULL values should be that if one argument is NULL the 
> result is NULL. We should unify the processing logic for that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10049) Unify the processing logic for NULL arguments in SQL built-in functions

2019-04-08 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16812931#comment-16812931
 ] 

vinoyang commented on FLINK-10049:
--

[~twalthr] What's your opinion?

> Unify the processing logic for NULL arguments in SQL built-in functions
> ---
>
> Key: FLINK-10049
> URL: https://issues.apache.org/jira/browse/FLINK-10049
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Xingcan Cui
>Assignee: vinoyang
>Priority: Major
>
> Currently, the built-in functions treat NULL arguments in different ways. 
> E.g., ABS(NULL) returns NULL, while LOG10(NULL) throws an NPE. The general 
> SQL-way of handling NULL values should be that if one argument is NULL the 
> result is NULL. We should unify the processing logic for that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10049) Unify the processing logic for NULL arguments in SQL built-in functions

2019-04-08 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-10049:


Assignee: vinoyang

> Unify the processing logic for NULL arguments in SQL built-in functions
> ---
>
> Key: FLINK-10049
> URL: https://issues.apache.org/jira/browse/FLINK-10049
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Xingcan Cui
>Assignee: vinoyang
>Priority: Major
>
> Currently, the built-in functions treat NULL arguments in different ways. 
> E.g., ABS(NULL) returns NULL, while LOG10(NULL) throws an NPE. The general 
> SQL-way of handling NULL values should be that if one argument is NULL the 
> result is NULL. We should unify the processing logic for that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] klion26 commented on a change in pull request #8122: [FLINK-12121] [State Backends] Use composition instead of inheritance for the InternalKeyContext logic in backend

2019-04-08 Thread GitBox
klion26 commented on a change in pull request #8122: [FLINK-12121] [State 
Backends] Use composition instead of inheritance for the InternalKeyContext 
logic in backend
URL: https://github.com/apache/flink/pull/8122#discussion_r273298274
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ##
 @@ -269,7 +266,7 @@ public HeapKeyedStateBackend(
}
StateTable stateTable = tryRegisterStateTable(
namespaceSerializer, stateDesc, 
getStateSnapshotTransformFactory(stateDesc, snapshotTransformFactory));
-   return stateFactory.createState(stateDesc, stateTable, 
getKeySerializer());
+   return stateFactory.createState(stateDesc, stateTable, 
this.getKeySerializer());
 
 Review comment:
   nit: can remove `this.`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8122: [FLINK-12121] [State Backends] Use composition instead of inheritance for the InternalKeyContext logic in backend

2019-04-08 Thread GitBox
klion26 commented on a change in pull request #8122: [FLINK-12121] [State 
Backends] Use composition instead of inheritance for the InternalKeyContext 
logic in backend
URL: https://github.com/apache/flink/pull/8122#discussion_r273298203
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ##
 @@ -236,7 +233,7 @@ public HeapKeyedStateBackend(
newStateSerializer,
snapshotTransformFactory);
 
-   stateTable = snapshotStrategy.newStateTable(this, 
newMetaInfo);
+   stateTable = 
snapshotStrategy.newStateTable(this.keyContext, newMetaInfo);
 
 Review comment:
   nit: can remove `this.`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8122: [FLINK-12121] [State Backends] Use composition instead of inheritance for the InternalKeyContext logic in backend

2019-04-08 Thread GitBox
klion26 commented on a change in pull request #8122: [FLINK-12121] [State 
Backends] Use composition instead of inheritance for the InternalKeyContext 
logic in backend
URL: https://github.com/apache/flink/pull/8122#discussion_r273297318
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 ##
 @@ -94,52 +89,55 @@
/** Decorates the input and output streams to write key-groups 
compressed. */
protected final StreamCompressionDecorator keyGroupCompressionDecorator;
 
+   /** The key context for this backend. */
+   protected final InternalKeyContext keyContext;
+
public AbstractKeyedStateBackend(
TaskKvStateRegistry kvStateRegistry,
TypeSerializer keySerializer,
ClassLoader userCodeClassLoader,
-   int numberOfKeyGroups,
-   KeyGroupRange keyGroupRange,
ExecutionConfig executionConfig,
TtlTimeProvider ttlTimeProvider,
-   CloseableRegistry cancelStreamRegistry) {
+   CloseableRegistry cancelStreamRegistry,
+   InternalKeyContext keyContext) {
this(
kvStateRegistry,

StateSerializerProvider.fromNewRegisteredSerializer(keySerializer),
userCodeClassLoader,
-   numberOfKeyGroups,
-   keyGroupRange,
executionConfig,
ttlTimeProvider,
cancelStreamRegistry,
-   determineStreamCompression(executionConfig)
+   determineStreamCompression(executionConfig),
+   keyContext
);
}
 
public AbstractKeyedStateBackend(
TaskKvStateRegistry kvStateRegistry,
StateSerializerProvider keySerializerProvider,
ClassLoader userCodeClassLoader,
-   int numberOfKeyGroups,
-   KeyGroupRange keyGroupRange,
ExecutionConfig executionConfig,
TtlTimeProvider ttlTimeProvider,
CloseableRegistry cancelStreamRegistry,
-   StreamCompressionDecorator keyGroupCompressionDecorator) {
+   StreamCompressionDecorator keyGroupCompressionDecorator,
+   InternalKeyContext keyContext) {
+   Preconditions.checkNotNull(keyContext);
 
 Review comment:
   Do we need to add `checkNotNull(keyGroupCompressionDecorator)` here


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8122: [FLINK-12121] [State Backends] Use composition instead of inheritance for the InternalKeyContext logic in backend

2019-04-08 Thread GitBox
klion26 commented on a change in pull request #8122: [FLINK-12121] [State 
Backends] Use composition instead of inheritance for the InternalKeyContext 
logic in backend
URL: https://github.com/apache/flink/pull/8122#discussion_r273300375
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
 ##
 @@ -143,6 +143,7 @@ public Void restore() throws Exception {
}
 
keySerializerRestored = true;
+   
keyContext.setCurrentKeySerializer(keySerializerProvider.currentSchemaSerializer());
 
 Review comment:
   When initializing keyContext In `HeapKeyedStateBackendBuilder`, we have set 
the serializer, why do we need to set serializer here again? 
   If we do not need to update serializer here, can we remove the 
`setCurrentKeySerializer` method and mark  `currentKeySerializer` as final in 
`InternalKeyContextImpl`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8122: [FLINK-12121] [State Backends] Use composition instead of inheritance for the InternalKeyContext logic in backend

2019-04-08 Thread GitBox
klion26 commented on a change in pull request #8122: [FLINK-12121] [State 
Backends] Use composition instead of inheritance for the InternalKeyContext 
logic in backend
URL: https://github.com/apache/flink/pull/8122#discussion_r273297590
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 ##
 @@ -175,8 +173,8 @@ public void dispose() {
@Override
public void setCurrentKey(K newKey) {
notifyKeySelected(newKey);
-   this.currentKey = newKey;
-   this.currentKeyGroup = 
KeyGroupRangeAssignment.assignToKeyGroup(newKey, numberOfKeyGroups);
+   this.keyContext.setCurrentKey(newKey);
 
 Review comment:
   `InternalKeyContext#setCurrentKeyGroupIndex()` is just called here(beside 
`InternalKeyContext#setCurrentKey`), can we add a function 
`setCurrentKeyAndGroup` in `InternalKeyContext` and remove this 
`setCurrentKeyGroupIndex` method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-08 Thread GitBox
JingsongLi commented on a change in pull request #8102: 
[FLINK-12087][table-runtime-blink] Introduce over window operators to blink 
batch
URL: https://github.com/apache/flink/pull/8102#discussion_r273299696
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/over/frame/RangeUnboundedPrecedingOverFrame.java
 ##
 @@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate.over.frame;
+
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.generated.GeneratedRecordComparator;
+import org.apache.flink.table.generated.RecordComparator;
+import org.apache.flink.table.runtime.context.ExecutionContext;
+
+/**
+ * The UnboundPreceding window frame calculates frames with the following SQL 
form:
+ * ... ROW BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW.
 
 Review comment:
   Yes, I will change it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12128) There is a typo on the website

2019-04-08 Thread Kenneth Yang (JIRA)
Kenneth Yang created FLINK-12128:


 Summary: There is a typo on the website
 Key: FLINK-12128
 URL: https://issues.apache.org/jira/browse/FLINK-12128
 Project: Flink
  Issue Type: Bug
  Components: Project Website
Reporter: Kenneth Yang


[https://flink.apache.org/roadmap.html]

"Various of these enhancements can be taken _*form*_ the contributed code from 
the [Blink fork|https://github.com/apache/flink/tree/blink].;

I think this sentence has a typo, should change the *form* to _from_

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-08 Thread GitBox
xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add 
ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r273261874
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A generic catalog implementation that holds all meta objects in memory.
+ */
+public class GenericInMemoryCatalog implements ReadableWritableCatalog {
+
+   public static final String DEFAULT_DB = "default";
+
+   private String defaultDatabaseName = DEFAULT_DB;
+
+   private final String catalogName;
+   private final Map databases;
+   private final Map tables;
+
+   public GenericInMemoryCatalog(String name) {
+   
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name 
cannot be null or empty");
+
+   this.catalogName = name;
+   this.databases = new LinkedHashMap<>();
+   this.databases.put(DEFAULT_DB, new GenericCatalogDatabase());
+   this.tables = new LinkedHashMap<>();
+   }
+
+   @Override
+   public String getDefaultDatabaseName() {
+   return defaultDatabaseName;
+   }
+
+   @Override
+   public void setDefaultDatabaseName(String databaseName) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+   defaultDatabaseName = databaseName;
+   }
+
+   @Override
+   public void open() {
+
+   }
+
+   @Override
+   public void close() {
+
+   }
+
+   // -- databases --
+
+   @Override
+   public void createDatabase(String databaseName, CatalogDatabase db, 
boolean ignoreIfExists)
+   throws DatabaseAlreadyExistException {
+   if (databaseExists(databaseName)) {
+   if (!ignoreIfExists) {
+   throw new 
DatabaseAlreadyExistException(catalogName, databaseName);
+   }
+   } else {
+   databases.put(databaseName, db.copy());
+   }
+   }
+
+   @Override
+   public void dropDatabase(String dbName, boolean ignoreIfNotExists) 
throws DatabaseNotExistException {
+   if (databases.containsKey(dbName)) {
+
+   // Make sure the database is empty
+   if (isDatabaseEmpty(dbName)) {
+   databases.remove(dbName);
+   } else {
+   throw new 
DatabaseNotEmptyException(catalogName, dbName);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new DatabaseNotExistException(catalogName, 
dbName);
+   }
+   }
+
+   private boolean isDatabaseEmpty(String databaseName) {
+   return tables.keySet().stream().noneMatch(op -> 
op.getDatabaseName().equals(databaseName));
+   // TODO: also check function when function is added.
+   }
+
+   @Override
+   public void alterDatabase(String databaseName, CatalogDatabase 
newDatabase, boolean ignoreIfNotExists)
+   throws DatabaseNotExistException {
+   if (databaseExists(databaseName)) {
+   databases.put(databaseName, 

[GitHub] [flink] sjwiesman commented on issue #8123: FLINK-12119 [build-system] add owasp-dependency-check plugin

2019-04-08 Thread GitBox
sjwiesman commented on issue #8123: FLINK-12119 [build-system] add 
owasp-dependency-check plugin
URL: https://github.com/apache/flink/pull/8123#issuecomment-481026429
 
 
   @knaufk I ran your PR locally and everything executed as expected. Based on 
the way this is configured vulnerability checks are only executed manually, 
never as part of travis. Is that the intent? I am not advocating for checking 
on every PR but should we have this as a new nightly so we can periodically 
check for newly discovered vulnerabilities?  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-08 Thread GitBox
xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add 
ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r273242001
 
 

 ##
 File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 ##
 @@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for GenericInMemoryCatalog.
+ */
+public class GenericInMemoryCatalogTest {
+   private static final String IS_STREAMING = "is_streaming";
+
+   private final String testCatalogName = "test-catalog";
+   private final String db1 = "db1";
+   private final String db2 = "db2";
+
+   private final String t1 = "t1";
+   private final String t2 = "t2";
+   private final ObjectPath path1 = new ObjectPath(db1, t1);
+   private final ObjectPath path2 = new ObjectPath(db2, t2);
+   private final ObjectPath path3 = new ObjectPath(db1, t2);
+   private final ObjectPath nonExistDbPath = 
ObjectPath.fromString("non.exist");
+   private final ObjectPath nonExistObjectPath = 
ObjectPath.fromString("db1.nonexist");
+
+   private static final String TEST_COMMENT = "test comment";
+
+   private static ReadableWritableCatalog catalog;
+
+   @Before
+   public void setUp() {
+   catalog = new GenericInMemoryCatalog(db1);
+   }
+
+   @Rule
+   public ExpectedException exception = ExpectedException.none();
+
+   @After
+   public void close() {
+   if (catalog.tableExists(path1)) {
+   catalog.dropTable(path1, true);
+   }
+   if (catalog.tableExists(path2)) {
+   catalog.dropTable(path2, true);
+   }
+   if (catalog.tableExists(path3)) {
+   catalog.dropTable(path3, true);
+   }
+   if (catalog.databaseExists(db1)) {
+   catalog.dropDatabase(db1, true);
+   }
+   if (catalog.databaseExists(db2)) {
+   catalog.dropDatabase(db2, true);
+   }
+   }
+
+   @AfterClass
 
 Review comment:
   Catalog is created and discarded after each test. Thus, the @AfterClass is 
not needed. Instead, I put open() and close() in @before and @After.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-08 Thread GitBox
xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add 
ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r273242001
 
 

 ##
 File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 ##
 @@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for GenericInMemoryCatalog.
+ */
+public class GenericInMemoryCatalogTest {
+   private static final String IS_STREAMING = "is_streaming";
+
+   private final String testCatalogName = "test-catalog";
+   private final String db1 = "db1";
+   private final String db2 = "db2";
+
+   private final String t1 = "t1";
+   private final String t2 = "t2";
+   private final ObjectPath path1 = new ObjectPath(db1, t1);
+   private final ObjectPath path2 = new ObjectPath(db2, t2);
+   private final ObjectPath path3 = new ObjectPath(db1, t2);
+   private final ObjectPath nonExistDbPath = 
ObjectPath.fromString("non.exist");
+   private final ObjectPath nonExistObjectPath = 
ObjectPath.fromString("db1.nonexist");
+
+   private static final String TEST_COMMENT = "test comment";
+
+   private static ReadableWritableCatalog catalog;
+
+   @Before
+   public void setUp() {
+   catalog = new GenericInMemoryCatalog(db1);
+   }
+
+   @Rule
+   public ExpectedException exception = ExpectedException.none();
+
+   @After
+   public void close() {
+   if (catalog.tableExists(path1)) {
+   catalog.dropTable(path1, true);
+   }
+   if (catalog.tableExists(path2)) {
+   catalog.dropTable(path2, true);
+   }
+   if (catalog.tableExists(path3)) {
+   catalog.dropTable(path3, true);
+   }
+   if (catalog.databaseExists(db1)) {
+   catalog.dropDatabase(db1, true);
+   }
+   if (catalog.databaseExists(db2)) {
+   catalog.dropDatabase(db2, true);
+   }
+   }
+
+   @AfterClass
 
 Review comment:
   Sounds good.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-08 Thread GitBox
xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add 
ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r273236006
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
 ##
 @@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.plan.stats.TableStats;
+
+import java.util.Map;
+
+/**
+ * A generic catalog table implementation.
+ */
+public class GenericCatalogTable implements CatalogTable {
+   // Schema of the table (column names and types)
+   private final TableSchema tableSchema;
+   // Properties of the table
+   private final Map properties;
+   // Statistics of the table
+   private TableStats tableStats = null;
+   // Comment of the table
+   private String comment = "This is a generic catalog table.";
+
+   public GenericCatalogTable(TableSchema tableSchema, TableStats 
tableStats, Map properties) {
+   this.tableSchema = tableSchema;
+   this.tableStats = tableStats;
+   this.properties = properties;
+   }
+
+   @Override
+   public TableStats getStatistics() {
+   return this.tableStats;
+   }
+
+   @Override
+   public Map getProperties() {
+   return properties;
+   }
+
+   @Override
+   public TableSchema getSchema() {
+   return this.tableSchema;
+   }
+
+   @Override
+   public GenericCatalogTable copy() {
+   return new GenericCatalogTable(this.tableSchema, 
this.tableStats, this.properties);
 
 Review comment:
   Good catch!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-08 Thread GitBox
xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add 
ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r273231567
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
 ##
 @@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.plan.stats.TableStats;
+
+import java.util.Map;
+
+/**
+ * A generic catalog table implementation.
+ */
+public class GenericCatalogTable implements CatalogTable {
+   // Schema of the table (column names and types)
+   private final TableSchema tableSchema;
+   // Properties of the table
+   private final Map properties;
+   // Statistics of the table
+   private TableStats tableStats = null;
 
 Review comment:
   "final" could be overkill, so I remove it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-08 Thread GitBox
xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add 
ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r273230556
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A generic catalog database implementation.
+ */
+public class GenericCatalogDatabase implements CatalogDatabase {
+   private final Map properties;
+   private String comment;
+
+   public GenericCatalogDatabase() {
+   this(new HashMap<>(), null);
 
 Review comment:
   Good point!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-08 Thread GitBox
flinkbot edited a comment on issue #8007: [FLINK-11474][table] Add 
ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#issuecomment-474185108
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @sunjincheng121 [committer]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @sunjincheng121 [committer]
   * ❗ 3. Needs [attention] from.
   - Needs attention by @aljoscha [PMC], @twalthr [PMC]
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @sunjincheng121 [committer]
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-08 Thread GitBox
xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add 
ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r273224721
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A generic catalog database implementation.
+ */
+public class GenericCatalogDatabase implements CatalogDatabase {
 
 Review comment:
   We overwrite them if the default one doesn't satisfy our needs. For a 
database, the most important part is its name, which isn't part of the object 
definition. I think it's fine if two DBs named differently happen to have the 
same definition (properties/comment). They are two different databases. 
Therefore, I believe reference equivalence is sufficient in this case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-08 Thread GitBox
xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add 
ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r273221007
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A generic catalog implementation that holds all meta objects in memory.
+ */
+public class GenericInMemoryCatalog implements ReadableWritableCatalog {
 
 Review comment:
   The "generic" word used here to signify the catalog is for Flink metadata, 
which is similar to GenericHiveMetastoreCatalog. The naming comes from previous 
discussion with Timo.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12124) Security is not support dynamicProperties

2019-04-08 Thread Zhenqiu Huang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16812739#comment-16812739
 ] 

Zhenqiu Huang commented on FLINK-12124:
---

[~zhouqi]
I have simliar requirement in our org. Will create PR this weekend. 

> Security is not support dynamicProperties
> -
>
> Key: FLINK-12124
> URL: https://issues.apache.org/jira/browse/FLINK-12124
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
>Affects Versions: 1.7.2
>Reporter: zhouqi
>Assignee: Zhenqiu Huang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12124) Security is not support dynamicProperties

2019-04-08 Thread Zhenqiu Huang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenqiu Huang reassigned FLINK-12124:
-

Assignee: Zhenqiu Huang

> Security is not support dynamicProperties
> -
>
> Key: FLINK-12124
> URL: https://issues.apache.org/jira/browse/FLINK-12124
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
>Affects Versions: 1.7.2
>Reporter: zhouqi
>Assignee: Zhenqiu Huang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] azagrebin commented on a change in pull request #7809: [FLINK-11710][tests] Refactor SimpleSlotProvider to TestingLogicalSlo…

2019-04-08 Thread GitBox
azagrebin commented on a change in pull request #7809: [FLINK-11710][tests] 
Refactor SimpleSlotProvider to TestingLogicalSlo…
URL: https://github.com/apache/flink/pull/7809#discussion_r273191251
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/TestingLogicalSlotProvider.java
 ##
 @@ -53,30 +52,31 @@
 /**
  * A testing utility slot provider that simply has a predefined pool of slots.
  */
-public class SimpleSlotProvider implements SlotProvider, SlotOwner {
+public class TestingLogicalSlotProvider implements SlotProvider, SlotOwner {
 
 Review comment:
   Also, here, is this renaming needed? At least, it could be shorter 
`TestingSlotProvider` because `SlotProvider` interface is already about 
`LogicalSlot`s.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #7809: [FLINK-11710][tests] Refactor SimpleSlotProvider to TestingLogicalSlo…

2019-04-08 Thread GitBox
azagrebin commented on a change in pull request #7809: [FLINK-11710][tests] 
Refactor SimpleSlotProvider to TestingLogicalSlo…
URL: https://github.com/apache/flink/pull/7809#discussion_r273189880
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SettableSlotContext.java
 ##
 @@ -26,9 +26,9 @@
 import org.apache.flink.util.Preconditions;
 
 /**
- * Simple implementation of the {@link SlotContext} interface for the legacy 
code.
+ * A settable implementation of the {@link SlotContext} interface.
  */
-public class SimpleSlotContext implements SlotContext {
+public class SettableSlotContext implements SlotContext {
 
 Review comment:
   why is it renamed to `SettableSlotContext`? In particular, what is meant by 
`settable`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #7809: [FLINK-11710][tests] Refactor SimpleSlotProvider to TestingLogicalSlo…

2019-04-08 Thread GitBox
azagrebin commented on a change in pull request #7809: [FLINK-11710][tests] 
Refactor SimpleSlotProvider to TestingLogicalSlo…
URL: https://github.com/apache/flink/pull/7809#discussion_r273189296
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/TestingLogicalSlotProvider.java
 ##
 @@ -53,30 +52,31 @@
 /**
  * A testing utility slot provider that simply has a predefined pool of slots.
  */
-public class SimpleSlotProvider implements SlotProvider, SlotOwner {
+public class TestingLogicalSlotProvider implements SlotProvider, SlotOwner {
 
private final Object lock = new Object();
 
private final ArrayDeque slots;
 
private final HashMap allocatedSlots;
 
-   public SimpleSlotProvider(JobID jobId, int numSlots) {
+   public TestingLogicalSlotProvider(JobID jobId, int numSlots) {
this(jobId, numSlots, new SimpleAckingTaskManagerGateway());
}
 
-   public SimpleSlotProvider(JobID jobId, int numSlots, TaskManagerGateway 
taskManagerGateway) {
+   public TestingLogicalSlotProvider(JobID jobId, int numSlots, 
TaskManagerGateway taskManagerGateway) {
checkNotNull(jobId, "jobId");
checkArgument(numSlots >= 0, "numSlots must be >= 0");
 
this.slots = new ArrayDeque<>(numSlots);
 
for (int i = 0; i < numSlots; i++) {
-   SimpleSlotContext as = new SimpleSlotContext(
+   SettableSlotContext as = new SettableSlotContext(
 
 Review comment:
   Do we actually need to keep `SlotContext` in `slots`? Could it be just 
immediately `LogicalSlot`? That might simplify everything and then the 
`TestingLogicalSlot/SettableSlotContext` would not need to be created all the 
time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-08 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r273172948
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
 
 Review comment:
   ok, sorry, I see your point now. Actually, the credit-based mode seems to 
belong to Network but not Netty. Maybe, `NettyConnectionManager` should rather 
take one more constructor argument `isCreditBased` and then we can keep it only 
in `NetworkEnvironmentConfiguration`. We should just make sure that 
`isCreditBased` is `false` for `NetworkEnvironmentConfiguration.nettyConfig = 
null`


[GitHub] [flink] piyushnarang commented on a change in pull request #8117: [FLINK-12115] [filesystems]: Add support for AzureFS

2019-04-08 Thread GitBox
piyushnarang commented on a change in pull request #8117: [FLINK-12115] 
[filesystems]: Add support for AzureFS
URL: https://github.com/apache/flink/pull/8117#discussion_r273171086
 
 

 ##
 File path: 
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java
 ##
 @@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Abstract factory for AzureFS. Subclasses override to specify
+ * the correct scheme (wasb / wasbs).
+ */
+public abstract class AbstractAzureFSFactory implements FileSystemFactory {
+   private static final Logger LOG = 
LoggerFactory.getLogger(AzureFSFactory.class);
+
+   private Configuration flinkConfig;
+
+   @Override
+   public void configure(Configuration config) {
+   flinkConfig = config;
 
 Review comment:
   I'm not sure I understand. The approach I've taken is basically copying 
relevant Flink config entries to the hadoop config in the `AzureFileSystem` 
class if they match a `azure.` or `fs.azure.` prefix. Were you suggesting I 
base this off the S3 fs factory? or a different fs factory? (seems like the 
various ones are a bit different from each other on this)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] piyushnarang commented on issue #8117: [FLINK-12115] [filesystems]: Add support for AzureFS

2019-04-08 Thread GitBox
piyushnarang commented on issue #8117: [FLINK-12115] [filesystems]: Add support 
for AzureFS
URL: https://github.com/apache/flink/pull/8117#issuecomment-480938035
 
 
   @Myasuka - thanks for taking a look. Updated to add this to opt.xml and some 
docs as well. Let me know if you think I should expand / clarify the docs or 
add some documentation to a specific location apart from this. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] piyushnarang commented on a change in pull request #8117: [FLINK-12115] [filesystems]: Add support for AzureFS

2019-04-08 Thread GitBox
piyushnarang commented on a change in pull request #8117: [FLINK-12115] 
[filesystems]: Add support for AzureFS
URL: https://github.com/apache/flink/pull/8117#discussion_r273168496
 
 

 ##
 File path: 
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java
 ##
 @@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Abstract factory for AzureFS. Subclasses override to specify
+ * the correct scheme (wasb / wasbs).
+ */
+public abstract class AbstractAzureFSFactory implements FileSystemFactory {
+   private static final Logger LOG = 
LoggerFactory.getLogger(AzureFSFactory.class);
+
+   private Configuration flinkConfig;
+
+   @Override
+   public void configure(Configuration config) {
+   flinkConfig = config;
+   }
+
+   @Override
+   public FileSystem create(URI fsUri) throws IOException {
+   checkNotNull(fsUri, "fsUri");
 
 Review comment:
   agree, updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] piyushnarang commented on a change in pull request #8117: [FLINK-12115] [filesystems]: Add support for AzureFS

2019-04-08 Thread GitBox
piyushnarang commented on a change in pull request #8117: [FLINK-12115] 
[filesystems]: Add support for AzureFS
URL: https://github.com/apache/flink/pull/8117#discussion_r273168590
 
 

 ##
 File path: 
flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFileSystemBehaviorITCase.java
 ##
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemBehaviorTestSuite;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import static 
org.apache.flink.core.fs.FileSystemTestUtils.checkPathEventualExistence;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * An implementation of the {@link FileSystemBehaviorTestSuite} for Azure based
+ * file system.
+ */
+@RunWith(Parameterized.class)
+public class AzureFileSystemBehaviorITCase extends FileSystemBehaviorTestSuite 
{
+
+   @Parameterized.Parameter
+   public String scheme;
+
+   @Parameterized.Parameters(name = "Scheme = {0}")
+   public static List parameters() {
+   return Arrays.asList("wasb", "wasbs");
+   }
+
+   private static final String CONTAINER = 
System.getenv("ARTIFACTS_AZURE_CONTAINER");
+   private static final String ACCOUNT = 
System.getenv("ARTIFACTS_AZURE_STORAGE_ACCOUNT");
+   private static final String ACCESS_KEY = 
System.getenv("ARTIFACTS_AZURE_ACCESS_KEY");
+
+   private static final String TEST_DATA_DIR = "tests-" + 
UUID.randomUUID();
+
+   @BeforeClass
+   public static void checkCredentialsAndSetup() throws IOException {
+   // check whether credentials and container / account details 
exist
+   Assume.assumeTrue("Azure storage account not configured, 
skipping test...", ACCOUNT != null);
 
 Review comment:
   updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-08 Thread GitBox
zhijiangW commented on issue #8090: [FLINK-12067][network] Refactor the 
constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#issuecomment-480923758
 
 
   @azagrebin  thanks for review again. I submitted the commit for addressing 
left comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-7865) Remove predicate restrictions on TableFunction left outer join

2019-04-08 Thread Haisheng Yuan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16812593#comment-16812593
 ] 

Haisheng Yuan commented on FLINK-7865:
--

[~till.rohrmann] [~fhueske] FYI

> Remove predicate restrictions on TableFunction left outer join
> --
>
> Key: FLINK-7865
> URL: https://issues.apache.org/jira/browse/FLINK-7865
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Major
>
> To cover up the improper translation of lateral table left outer join 
> (CALCITE-2004), we have temporarily forbidden the predicates (except {{true}} 
> literal) in Table API (FLINK-7853) and SQL (FLINK-7854). Once the issue has 
> been fixed in Calcite, we should remove the restrictions. The tasks may 
> include removing Table API/SQL condition check, removing validation tests, 
> enabling integration tests, updating the documents, etc.
> See [this thread on Calcite dev 
> list|https://lists.apache.org/thread.html/16caeb8b1649c4da85f9915ea723c6c5b3ced0b96914cadc24ee4e15@%3Cdev.calcite.apache.org%3E]
>  for more information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce Plugin/Loading system and integrate it with FileSystem

2019-04-08 Thread GitBox
pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce 
Plugin/Loading system and integrate it with FileSystem
URL: https://github.com/apache/flink/pull/8038#discussion_r273129912
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java
 ##
 @@ -32,22 +29,24 @@
  */
 public final class PluginUtils {
 
-   private static final Logger log = 
LoggerFactory.getLogger(PluginUtils.class);
-
private PluginUtils() {
throw new AssertionError("Singleton class.");
}
 
public static void 
initPluginManagerSingletonFromRootFolder(Optional pluginsRootPath) {
-   Collection pluginDescriptorsForDirectory = 
Collections.emptyList();
+   Collection pluginDescriptorsForDirectory;
+
if (pluginsRootPath.isPresent()) {
try {
pluginDescriptorsForDirectory =
new 
DirectoryBasedPluginFinder(pluginsRootPath.get()).findPlugins();
-   } catch (IOException ioex) {
-   log.warn("Exception when locating plugins in 
root folder {}. Ignoring plugins.", pluginsRootPath, ioex);
+   } catch (IOException e) {
+   throw new IllegalStateException("Exception when 
trying to initialize plugin system.", e);
 
 Review comment:
   `IllegalStateException`? Maybe `FlinkException` or it's subclass like 
`ConfigurationException`? I'm not sure if this qualifies as "illegal state"?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce Plugin/Loading system and integrate it with FileSystem

2019-04-08 Thread GitBox
pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce 
Plugin/Loading system and integrate it with FileSystem
URL: https://github.com/apache/flink/pull/8038#discussion_r273130514
 
 

 ##
 File path: flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
 ##
 @@ -255,14 +252,47 @@
 * @param config the configuration from where to fetch the parameter.
 */
public static void initialize(Configuration config) throws 
IllegalConfigurationException {
 
 Review comment:
   `@Deprecated`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce Plugin/Loading system and integrate it with FileSystem

2019-04-08 Thread GitBox
pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce 
Plugin/Loading system and integrate it with FileSystem
URL: https://github.com/apache/flink/pull/8038#discussion_r273133237
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/plugin/DirectoryBasedPluginFinder.java
 ##
 @@ -78,19 +78,17 @@ public DirectoryBasedPluginDescriptorsFactory(Path 
pluginsRootDir) {
 
private Optional 
createPluginDescriptorForSubDirectory(Path subDirectory) {
Optional jarURLsFromDirectory = 
createJarURLsFromDirectory(subDirectory);
-   if (jarURLsFromDirectory.isPresent()) {
-   URL[] urls = jarURLsFromDirectory.get();
-   // we sort the urls for the benefit of having a 
stable/reproducible order of jars.
-   Arrays.sort(urls, Comparator.comparing(URL::toString));
-   //TODO: This class could be extended to parse 
exclude-pattern from a optional text files in the plugin directories.
-   return Optional.of(
-   new PluginDescriptor(
-   subDirectory.getFileName().toString(),
-   urls,
-   new String[0]));
-   } else {
-   return Optional.empty();
-   }
+   return jarURLsFromDirectory
+   .map((URL[] urls) -> {
+   Arrays.sort(urls, 
Comparator.comparing(URL::toString));
+   //TODO: This class could be extended to parse 
exclude-pattern from a optional text files in the plugin directories.
+   return Optional.of(
+   new PluginDescriptor(
+   
subDirectory.getFileName().toString(),
+   urls,
+   new String[0]));
+   })
+   .orElse(Optional.empty());
 
 Review comment:
   I think you could drop `.orElse(...)` line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce Plugin/Loading system and integrate it with FileSystem

2019-04-08 Thread GitBox
pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce 
Plugin/Loading system and integrate it with FileSystem
URL: https://github.com/apache/flink/pull/8038#discussion_r273132981
 
 

 ##
 File path: flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java
 ##
 @@ -76,16 +77,9 @@ private LambdaUtil() {
final ClassLoader cl,
final ThrowingRunnable r) throws E {
 
-   final Thread currentThread = Thread.currentThread();
-   final ClassLoader oldClassLoader = 
currentThread.getContextClassLoader();
-
-   try {
-   currentThread.setContextClassLoader(cl);
+   try (TemporaryClassLoaderContext tmpCl = new 
TemporaryClassLoaderContext(cl)) {
 
 Review comment:
   nice findings


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce Plugin/Loading system and integrate it with FileSystem

2019-04-08 Thread GitBox
pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce 
Plugin/Loading system and integrate it with FileSystem
URL: https://github.com/apache/flink/pull/8038#discussion_r273128415
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/plugin/DirectoryBasedPluginFinder.java
 ##
 @@ -68,39 +63,38 @@ public DirectoryBasedPluginFinder(Path pluginsRootDir) {
 
@Override
public Collection findPlugins() throws IOException {
+
+   if (!Files.isDirectory(pluginsRootDir)) {
+   return Collections.emptyList();
 
 Review comment:
   question: Should we throw here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273130779
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
 ##
 @@ -71,15 +98,22 @@ class PlannerExpressionConverter private extends 
ApiExpressionVisitor[PlannerExp
 val extraNames = args
   .drop(2)
   .map(e => getValue[String](e))
-Alias(args.head, name, extraNames)
+if (extraNames.nonEmpty) {
+  TableAlias(args.head, name, extraNames)
 
 Review comment:
   I agree with Hequn. Maybe we can avoid this change.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273114839
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/OperationTreeBuilder.scala
 ##
 @@ -37,6 +43,25 @@ import _root_.scala.collection.JavaConverters._
 class OperationTreeBuilder(private val tableEnv: TableEnvironment) {
 
   private val expressionBridge: ExpressionBridge[PlannerExpression] = 
tableEnv.expressionBridge
+  private val columnOperationsFactory = new ColumnOperationsFactory
 
 Review comment:
   This is not really a factory and can just be a util class with static 
methods right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273101387
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/WrapInOrderRule.java
 ##
 @@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.rules;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.FunctionDefinition;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.flink.table.expressions.BuiltInFunctionDefinitions.ORDER_ASC;
+import static 
org.apache.flink.table.expressions.BuiltInFunctionDefinitions.ORDER_DESC;
+
+/**
+ * Makes sure that all expressions are wrapped in ordering expression. If the 
expression is either
+ * {@link 
org.apache.flink.table.expressions.BuiltInFunctionDefinitions#ORDER_ASC} or
+ * {@link 
org.apache.flink.table.expressions.BuiltInFunctionDefinitions#ORDER_DESC} it 
does nothing, otherwise
+ * it inserts {@link 
org.apache.flink.table.expressions.BuiltInFunctionDefinitions#ORDER_ASC}.
+ */
+@Internal
+final class WrapInOrderRule implements ResolverRule {
 
 Review comment:
   This logic belongs more to the tree builder than expression resolver.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273049948
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
 ##
 @@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.GroupWindow;
+import org.apache.flink.table.api.OverWindow;
+import org.apache.flink.table.api.SessionWithGapOnTimeWithAlias;
+import org.apache.flink.table.api.SlideWithSizeAndSlideOnTimeWithAlias;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TumbleWithSizeOnTimeWithAlias;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.lookups.FieldReferenceLookup;
+import org.apache.flink.table.expressions.lookups.OperationFieldLookup;
+import org.apache.flink.table.expressions.lookups.TableReferenceLookup;
+import org.apache.flink.table.expressions.rules.ResolverRule;
+import org.apache.flink.table.expressions.rules.ResolverRules;
+import org.apache.flink.table.operations.TableOperation;
+import org.apache.flink.table.plan.logical.LogicalOverWindow;
+import org.apache.flink.table.plan.logical.LogicalWindow;
+import org.apache.flink.table.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.plan.logical.TumblingGroupWindow;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Some;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Tries to resolve all unresolved expressions such as {@link 
UnresolvedReferenceExpression}
+ * or calls such as {@link BuiltInFunctionDefinitions#OVER}.
+ *
+ * The default set of rules ({@link 
ExpressionResolverBuilder#getDefaultRules()}) will resolve following references:
+ * 
+ * flatten '*' to all fields of underlying inputs
+ * join over aggregates with corresponding over windows into a single 
resolved call
+ * resolve remaining unresolved references to fields, tables or local 
references
+ * replace call to {@link BuiltInFunctionDefinitions#FLATTEN}
+ * performs call arguments types validation and inserts additional 
casts if possible
+ * 
+ */
+public class ExpressionResolver {
+
+   private final PlannerExpressionConverter bridgeConverter = 
PlannerExpressionConverter.INSTANCE();
+
+   private final FieldReferenceLookup fieldLookup;
+
+   private final TableReferenceLookup tableLookup;
+
+   //TODO change to LocalReferenceLookup, once we don't need to resolve 
fields to create LocalReferenceExpression
+   private final Map localReferences = 
new HashMap<>();
+
+   private final Map overWindows;
+
+   private final Function, List> 
resolveFunction;
+
+   private ExpressionResolver(
+   TableReferenceLookup tableLookup,
+   FieldReferenceLookup fieldLookup,
+   List overWindows,
+   @Nullable GroupWindow groupWindow,
+   List rules) {
+   this.tableLookup = Preconditions.checkNotNull(tableLookup);
+   this.fieldLookup = Preconditions.checkNotNull(fieldLookup);
+   this.resolveFunction = concatenateRules(rules);
+
+   this.overWindows = prepareOverWindows(overWindows);
+   prepareLocalReferencesFromGroupWindows(groupWindow);
+   }
+
+   /**
+* Creates a builder for {@link ExpressionResolver}. One can add 
additional properties to the resolver
+* like e.g. {@link GroupWindow} or {@link OverWindow}. You can also 

[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273100967
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/StarReferenceFlatteningRule.java
 ##
 @@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.rules;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * Replaces '*' with all available {@link 
org.apache.flink.table.expressions.FieldReferenceExpression}s from underlying
+ * inputs.
+ */
+@Internal
+final class StarReferenceFlatteningRule implements ResolverRule {
+
+   @Override
+   public List apply(List expression, 
ResolutionContext context) {
+   return expression.stream()
+   .flatMap(expr -> expr.accept(new 
FieldFlatteningVisitor(context)).stream())
+   .collect(Collectors.toList());
+   }
+
+   private static class FieldFlatteningVisitor extends 
ApiExpressionDefaultVisitor> {
+
+   private final ResolutionContext resolutionContext;
 
 Review comment:
   Maybe introduce a helper class to reduce a bit of code duplication for every 
rule visitor.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273117051
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
 ##
 @@ -18,22 +18,25 @@
 
 package org.apache.flink.table.plan.logical
 
+import java.util.Optional
+import java.util.{List => JList}
+
 import org.apache.flink.table.api.{BatchTableEnvironment, 
StreamTableEnvironment, TableEnvironment}
 import 
org.apache.flink.table.expressions.PlannerExpressionUtils.{isRowCountLiteral, 
isRowtimeAttribute, isTimeAttribute, isTimeIntervalLiteral}
 import org.apache.flink.table.expressions._
-import org.apache.flink.table.typeutils.TypeCheckUtils.{isTimePoint, isLong}
+import org.apache.flink.table.typeutils.TypeCheckUtils.{isLong, isTimePoint}
 import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, 
ValidationSuccess}
 
 // 

 // Over windows
 // 

 
 case class LogicalOverWindow(
 
 Review comment:
   Make this file a POJO in `o.a.f.t.operations` in a separate commit? We need 
some representation of windows as output of the API for the planner. Must not 
necessarily containing expressions only. Or only resolved expressions. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273119314
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala
 ##
 @@ -47,67 +47,60 @@ class CalcValidationTest extends TableTestBase {
 .select('w.end.rowtime, 'int.count as 'int) // no rowtime on non-window 
reference
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testAddColumnsWithAgg(): Unit = {
 val util = streamTestUtil()
 val tab = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 tab.addColumns('a.sum)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testAddOrReplaceColumnsWithAgg(): Unit = {
 val util = streamTestUtil()
 val tab = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 tab.addOrReplaceColumns('a.sum)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testRenameColumnsWithAgg(): Unit = {
   val util = streamTestUtil()
   val tab = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
   tab.renameColumns('a.sum)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testRenameColumnsWithoutAlias(): Unit = {
 val util = streamTestUtil()
 val tab = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 tab.renameColumns('a)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testRenameColumnsWithFunctallCall(): Unit = {
 val util = streamTestUtil()
 val tab = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 tab.renameColumns('a + 1  as 'a2)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testRenameColumnsNotExist(): Unit = {
 val util = streamTestUtil()
 val tab = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 tab.renameColumns('e as 'e2)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testDropColumnsWithAgg(): Unit = {
 val util = streamTestUtil()
 val tab = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 tab.dropColumns('a.sum)
   }
 
-  @Test(expected = classOf[TableException])
 
 Review comment:
   Isn't this test still valid?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273052867
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/lookups/FieldReferenceLookup.java
 ##
 @@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.lookups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Provides a way to look up field reference by the name of the field.
+ */
+@Internal
+public interface FieldReferenceLookup {
 
 Review comment:
   There is just one implementation. I would prefer merging this internal 
interface and class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273052519
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
 ##
 @@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.GroupWindow;
+import org.apache.flink.table.api.OverWindow;
+import org.apache.flink.table.api.SessionWithGapOnTimeWithAlias;
+import org.apache.flink.table.api.SlideWithSizeAndSlideOnTimeWithAlias;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TumbleWithSizeOnTimeWithAlias;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.lookups.FieldReferenceLookup;
+import org.apache.flink.table.expressions.lookups.OperationFieldLookup;
+import org.apache.flink.table.expressions.lookups.TableReferenceLookup;
+import org.apache.flink.table.expressions.rules.ResolverRule;
+import org.apache.flink.table.expressions.rules.ResolverRules;
+import org.apache.flink.table.operations.TableOperation;
+import org.apache.flink.table.plan.logical.LogicalOverWindow;
+import org.apache.flink.table.plan.logical.LogicalWindow;
+import org.apache.flink.table.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.plan.logical.TumblingGroupWindow;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Some;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Tries to resolve all unresolved expressions such as {@link 
UnresolvedReferenceExpression}
+ * or calls such as {@link BuiltInFunctionDefinitions#OVER}.
+ *
+ * The default set of rules ({@link 
ExpressionResolverBuilder#getDefaultRules()}) will resolve following references:
+ * 
+ * flatten '*' to all fields of underlying inputs
+ * join over aggregates with corresponding over windows into a single 
resolved call
+ * resolve remaining unresolved references to fields, tables or local 
references
+ * replace call to {@link BuiltInFunctionDefinitions#FLATTEN}
+ * performs call arguments types validation and inserts additional 
casts if possible
+ * 
+ */
+public class ExpressionResolver {
+
+   private final PlannerExpressionConverter bridgeConverter = 
PlannerExpressionConverter.INSTANCE();
+
+   private final FieldReferenceLookup fieldLookup;
+
+   private final TableReferenceLookup tableLookup;
+
+   //TODO change to LocalReferenceLookup, once we don't need to resolve 
fields to create LocalReferenceExpression
+   private final Map localReferences = 
new HashMap<>();
+
+   private final Map overWindows;
+
+   private final Function, List> 
resolveFunction;
+
+   private ExpressionResolver(
+   TableReferenceLookup tableLookup,
+   FieldReferenceLookup fieldLookup,
+   List overWindows,
+   @Nullable GroupWindow groupWindow,
+   List rules) {
+   this.tableLookup = Preconditions.checkNotNull(tableLookup);
+   this.fieldLookup = Preconditions.checkNotNull(fieldLookup);
+   this.resolveFunction = concatenateRules(rules);
+
+   this.overWindows = prepareOverWindows(overWindows);
+   prepareLocalReferencesFromGroupWindows(groupWindow);
+   }
+
+   /**
+* Creates a builder for {@link ExpressionResolver}. One can add 
additional properties to the resolver
+* like e.g. {@link GroupWindow} or {@link OverWindow}. You can also 

[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273108596
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/OperationTreeBuilder.scala
 ##
 @@ -18,16 +18,22 @@
 
 package org.apache.flink.table.plan
 
 Review comment:
   nit: move to `o.a.f.t.operations`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273117168
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 ##
 @@ -41,36 +41,17 @@ import org.apache.flink.table.validate.{ValidationFailure, 
ValidationSuccess}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.tools.nsc.interpreter.JList
 
 Review comment:
   wrong import


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273049675
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
 ##
 @@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.GroupWindow;
+import org.apache.flink.table.api.OverWindow;
+import org.apache.flink.table.api.SessionWithGapOnTimeWithAlias;
+import org.apache.flink.table.api.SlideWithSizeAndSlideOnTimeWithAlias;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TumbleWithSizeOnTimeWithAlias;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.lookups.FieldReferenceLookup;
+import org.apache.flink.table.expressions.lookups.OperationFieldLookup;
+import org.apache.flink.table.expressions.lookups.TableReferenceLookup;
+import org.apache.flink.table.expressions.rules.ResolverRule;
+import org.apache.flink.table.expressions.rules.ResolverRules;
+import org.apache.flink.table.operations.TableOperation;
+import org.apache.flink.table.plan.logical.LogicalOverWindow;
+import org.apache.flink.table.plan.logical.LogicalWindow;
+import org.apache.flink.table.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.plan.logical.TumblingGroupWindow;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Some;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Tries to resolve all unresolved expressions such as {@link 
UnresolvedReferenceExpression}
+ * or calls such as {@link BuiltInFunctionDefinitions#OVER}.
+ *
+ * The default set of rules ({@link 
ExpressionResolverBuilder#getDefaultRules()}) will resolve following references:
+ * 
+ * flatten '*' to all fields of underlying inputs
+ * join over aggregates with corresponding over windows into a single 
resolved call
+ * resolve remaining unresolved references to fields, tables or local 
references
+ * replace call to {@link BuiltInFunctionDefinitions#FLATTEN}
+ * performs call arguments types validation and inserts additional 
casts if possible
+ * 
+ */
+public class ExpressionResolver {
+
+   private final PlannerExpressionConverter bridgeConverter = 
PlannerExpressionConverter.INSTANCE();
+
+   private final FieldReferenceLookup fieldLookup;
+
+   private final TableReferenceLookup tableLookup;
+
+   //TODO change to LocalReferenceLookup, once we don't need to resolve 
fields to create LocalReferenceExpression
+   private final Map localReferences = 
new HashMap<>();
+
+   private final Map overWindows;
+
+   private final Function, List> 
resolveFunction;
+
+   private ExpressionResolver(
+   TableReferenceLookup tableLookup,
+   FieldReferenceLookup fieldLookup,
+   List overWindows,
+   @Nullable GroupWindow groupWindow,
+   List rules) {
+   this.tableLookup = Preconditions.checkNotNull(tableLookup);
+   this.fieldLookup = Preconditions.checkNotNull(fieldLookup);
+   this.resolveFunction = concatenateRules(rules);
+
+   this.overWindows = prepareOverWindows(overWindows);
+   prepareLocalReferencesFromGroupWindows(groupWindow);
+   }
+
+   /**
+* Creates a builder for {@link ExpressionResolver}. One can add 
additional properties to the resolver
+* like e.g. {@link GroupWindow} or {@link OverWindow}. You can also 

[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273102653
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRules.java
 ##
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.rules;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+
+/**
+ * Contains instances of {@link ResolverRule}.
+ */
+@PublicEvolving
+public final class ResolverRules {
+
+   /**
+* Rule that resolves flatten call. See {@link FlattenCallRule} for 
details.
+*/
+   public static final ResolverRule FLATTEN_CALL = new FlattenCallRule();
+
+   /**
+* Resolves {@link UnresolvedReferenceExpression}. See {@link 
ReferenceResolverRule} for details.
+*/
+   public static final ResolverRule FIELD_RESOLVE = new 
ReferenceResolverRule();
+
+   /**
+* Resolves call based on argument types. See {@link 
ResolveCallByArgumentsRule} for details.
+*/
+   public static final ResolverRule RESOLVE_CALL = new 
ResolveCallByArgumentsRule();
+
+   /**
+* Concatenates over aggregations with corresponding over window. See 
{@link OverWindowResolverRule} for details.
+*/
+   public static final ResolverRule OVER_WINDOWS = new 
OverWindowResolverRule();
+
+   /**
+* Resolves '*' expressions to corresponding fields of inputs. See 
{@link StarReferenceFlatteningRule} for details.
+*/
+   public static final ResolverRule FLATTEN_STAR_REFERENCE = new 
StarReferenceFlatteningRule();
+
+   /*
+   NON DEFAULT RULES
+*/
+
+   /**
+* Used in sort operation. It assures expression is wrapped in ordering 
expression. See {@link WrapInOrderRule}
+* for details.
+*/
+   public static final ResolverRule WRAP_IN_ORDER = new WrapInOrderRule();
+
+   /**
+* Used in projection operation. It derives name for expression. See 
{@link NameExpressionRule} for details.
+*/
+   public static final ResolverRule NAME_EXPRESSION = new 
NameExpressionRule();
 
 Review comment:
   This rule is more projection related and should be put into the projection 
builder.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273070489
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
 ##
 @@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.rules;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.CurrentRange;
+import org.apache.flink.table.expressions.CurrentRow;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.PlannerExpression;
+import org.apache.flink.table.plan.logical.LogicalOverWindow;
+import org.apache.flink.table.typeutils.RowIntervalTypeInfo;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.util.Arrays.asList;
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Joins call to {@link BuiltInFunctionDefinitions#OVER} with corresponding 
over window
+ * and creates a fully resolved over aggregation.
+ */
+@Internal
+final class OverWindowResolverRule implements ResolverRule {
+
+   @Override
+   public List apply(List expression, 
ResolutionContext context) {
+   return expression.stream()
+   .map(expr -> expr.accept(new 
ExpressionResolverVisitor(context)))
+   .collect(Collectors.toList());
+   }
+
+   private class ExpressionResolverVisitor extends 
ApiExpressionDefaultVisitor {
+
+   private final ResolutionContext context;
+
+   ExpressionResolverVisitor(ResolutionContext context) {
+   this.context = context;
+   }
+
+   @Override
+   public Expression visitCall(CallExpression call) {
+
+   if (call.getFunctionDefinition() == 
BuiltInFunctionDefinitions.OVER) {
+   List children = call.getChildren();
+   Expression alias = children.get(1);
+
+   LogicalOverWindow referenceWindow = 
context.getOverWindow(alias)
+   .orElseThrow(() -> new 
ValidationException("Could not resolve over call."));
+
+   Expression following = 
calculateOverWindowFollowing(referenceWindow);
+   List newArgs = new 
ArrayList<>(asList(
+   children.get(0),
+   referenceWindow.orderBy(),
+   referenceWindow.preceding(),
+   following));
+
+   newArgs.addAll(referenceWindow.partitionBy());
+   return new 
CallExpression(call.getFunctionDefinition(), newArgs);
+   } else {
+   return new CallExpression(
+   call.getFunctionDefinition(),
+   call.getChildren().stream().map(expr -> 
expr.accept(this)).collect(toList()));
+   }
+   }
+
+   private Expression 
calculateOverWindowFollowing(LogicalOverWindow referenceWindow) {
+   return referenceWindow.following().orElseGet(() -> {
+   PlannerExpression preceding = 
context.bridge(referenceWindow.preceding());
+   if (preceding.resultType() instanceof 
RowIntervalTypeInfo) {
+   return new CurrentRow();
 
 Review comment:
   Change to a call instead of planner expressions.


This is 

[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273106216
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ##
 @@ -42,8 +42,11 @@ import org.apache.flink.table.functions.{AggregateFunction, 
ScalarFunction, Tabl
 import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl
 import org.apache.flink.util.InstantiationUtil
+import java.util.{Optional, List => JList}
 
 Review comment:
   Unrelated changes?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273052322
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
 ##
 @@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.GroupWindow;
+import org.apache.flink.table.api.OverWindow;
+import org.apache.flink.table.api.SessionWithGapOnTimeWithAlias;
+import org.apache.flink.table.api.SlideWithSizeAndSlideOnTimeWithAlias;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TumbleWithSizeOnTimeWithAlias;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.lookups.FieldReferenceLookup;
+import org.apache.flink.table.expressions.lookups.OperationFieldLookup;
+import org.apache.flink.table.expressions.lookups.TableReferenceLookup;
+import org.apache.flink.table.expressions.rules.ResolverRule;
+import org.apache.flink.table.expressions.rules.ResolverRules;
+import org.apache.flink.table.operations.TableOperation;
+import org.apache.flink.table.plan.logical.LogicalOverWindow;
+import org.apache.flink.table.plan.logical.LogicalWindow;
+import org.apache.flink.table.plan.logical.SessionGroupWindow;
+import org.apache.flink.table.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.plan.logical.TumblingGroupWindow;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import scala.Some;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Tries to resolve all unresolved expressions such as {@link 
UnresolvedReferenceExpression}
+ * or calls such as {@link BuiltInFunctionDefinitions#OVER}.
+ *
+ * The default set of rules ({@link 
ExpressionResolverBuilder#getDefaultRules()}) will resolve following references:
+ * 
+ * flatten '*' to all fields of underlying inputs
+ * join over aggregates with corresponding over windows into a single 
resolved call
+ * resolve remaining unresolved references to fields, tables or local 
references
+ * replace call to {@link BuiltInFunctionDefinitions#FLATTEN}
+ * performs call arguments types validation and inserts additional 
casts if possible
+ * 
+ */
+public class ExpressionResolver {
+
+   private final PlannerExpressionConverter bridgeConverter = 
PlannerExpressionConverter.INSTANCE();
+
+   private final FieldReferenceLookup fieldLookup;
+
+   private final TableReferenceLookup tableLookup;
+
+   //TODO change to LocalReferenceLookup, once we don't need to resolve 
fields to create LocalReferenceExpression
+   private final Map localReferences = 
new HashMap<>();
+
+   private final Map overWindows;
+
+   private final Function, List> 
resolveFunction;
+
+   private ExpressionResolver(
+   TableReferenceLookup tableLookup,
+   FieldReferenceLookup fieldLookup,
+   List overWindows,
+   @Nullable GroupWindow groupWindow,
+   List rules) {
+   this.tableLookup = Preconditions.checkNotNull(tableLookup);
+   this.fieldLookup = Preconditions.checkNotNull(fieldLookup);
+   this.resolveFunction = concatenateRules(rules);
+
+   this.overWindows = prepareOverWindows(overWindows);
+   prepareLocalReferencesFromGroupWindows(groupWindow);
+   }
+
+   /**
+* Creates a builder for {@link ExpressionResolver}. One can add 
additional properties to the resolver
+* like e.g. {@link GroupWindow} or {@link OverWindow}. You can also 

[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r273120072
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/OverWindowValidationTest.scala
 ##
 @@ -148,6 +148,6 @@ class OverWindowValidationTest extends TableTestBase {
 
 table
 .window(Over orderBy 'rowtime preceding 1.minutes as 'w)
-.select('c, 'a.count over 'w, 'w.start, 'w.end)
+.select('c, 'a.count over 'w, 'w.start + 1, 'w.end + 2)
 
 Review comment:
   Unrelated change?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   >