[jira] [Commented] (FLINK-3780) Jaccard Similarity

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15279511#comment-15279511
 ] 

ASF GitHub Bot commented on FLINK-3780:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/1980

[FLINK-3780] [gelly] Jaccard Similarity

The Jaccard Index measures the similarity between vertex neighborhoods. 
Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are common).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 3780_jaccard_similarity

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1980.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1980


commit 8603c7a9a2de9f0071d8c677a36a5c300d35f40a
Author: Greg Hogan 
Date:   2016-05-09T18:45:15Z

[FLINK-3780] [gelly] Jaccard Similarity

The Jaccard Index measures the similarity between vertex neighborhoods.
Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are
common).




> Jaccard Similarity
> --
>
> Key: FLINK-3780
> URL: https://issues.apache.org/jira/browse/FLINK-3780
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> Implement a Jaccard Similarity algorithm computing all non-zero similarity 
> scores. This algorithm is similar to {{TriangleListing}} but instead of 
> joining two-paths against an edge list we count two-paths.
> {{flink-gelly-examples}} currently has {{JaccardSimilarityMeasure}} which 
> relies on {{Graph.getTriplets()}} so only computes similarity scores for 
> neighbors but not neighbors-of-neighbors.
> This algorithm is easily modified for other similarity scores such as 
> Adamic-Adar similarity where the sum of endpoint degrees is replaced by the 
> degree of the middle vertex.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity

2016-05-10 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/1980

[FLINK-3780] [gelly] Jaccard Similarity

The Jaccard Index measures the similarity between vertex neighborhoods. 
Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are common).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 3780_jaccard_similarity

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1980.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1980


commit 8603c7a9a2de9f0071d8c677a36a5c300d35f40a
Author: Greg Hogan 
Date:   2016-05-09T18:45:15Z

[FLINK-3780] [gelly] Jaccard Similarity

The Jaccard Index measures the similarity between vertex neighborhoods.
Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are
common).




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-3897) why does jobmanager process has too many ForkJoinTask object

2016-05-10 Thread ZhengBowen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZhengBowen updated FLINK-3897:
--
Description: 
jmap -histo 21415 |head -10 

 num #instances #bytes  class name
--
   1:  18458230 5059213664  [C
   2: 29703 1947328680  [Lscala.concurrent.forkjoin.ForkJoinTask;
   3:993331 1361835272  [B
   4:  35228671 1127317472  akka.dispatch.AbstractNodeQueue$Node
   5:  23123301  924932040  
akka.actor.LightArrayRevolverScheduler$TaskHolder
   6:  15217224  730426752  java.util.HashMap$Entry
   7:  16863612  539635584  java.lang.String

---
ForkJoinTask object takes up to 2GB of memory space,and the memory used has 
been growing.

why?

  was:
jmap -histo 21415 |head -10 

 num #instances #bytes  class name
--
   1:  18458230 5059213664  [C
   2: 29703 1947328680  [Lscala.concurrent.forkjoin.ForkJoinTask;
   3:993331 1361835272  [B
   4:  35228671 1127317472  akka.dispatch.AbstractNodeQueue$Node
   5:  23123301  924932040  
akka.actor.LightArrayRevolverScheduler$TaskHolder
   6:  15217224  730426752  java.util.HashMap$Entry
   7:  16863612  539635584  java.lang.String


> why does jobmanager process has too many ForkJoinTask object
> 
>
> Key: FLINK-3897
> URL: https://issues.apache.org/jira/browse/FLINK-3897
> Project: Flink
>  Issue Type: Improvement
>Reporter: ZhengBowen
>
> jmap -histo 21415 |head -10 
>  num #instances #bytes  class name
> --
>1:  18458230 5059213664  [C
>2: 29703 1947328680  [Lscala.concurrent.forkjoin.ForkJoinTask;
>3:993331 1361835272  [B
>4:  35228671 1127317472  akka.dispatch.AbstractNodeQueue$Node
>5:  23123301  924932040  
> akka.actor.LightArrayRevolverScheduler$TaskHolder
>6:  15217224  730426752  java.util.HashMap$Entry
>7:  16863612  539635584  java.lang.String
> ---
> ForkJoinTask object takes up to 2GB of memory space,and the memory used has 
> been growing.
> why?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3897) why does jobmanager process has too many ForkJoinTask object

2016-05-10 Thread ZhengBowen (JIRA)
ZhengBowen created FLINK-3897:
-

 Summary: why does jobmanager process has too many ForkJoinTask 
object
 Key: FLINK-3897
 URL: https://issues.apache.org/jira/browse/FLINK-3897
 Project: Flink
  Issue Type: Improvement
Reporter: ZhengBowen


jmap -histo 21415 |head -10 

 num #instances #bytes  class name
--
   1:  18458230 5059213664  [C
   2: 29703 1947328680  [Lscala.concurrent.forkjoin.ForkJoinTask;
   3:993331 1361835272  [B
   4:  35228671 1127317472  akka.dispatch.AbstractNodeQueue$Node
   5:  23123301  924932040  
akka.actor.LightArrayRevolverScheduler$TaskHolder
   6:  15217224  730426752  java.util.HashMap$Entry
   7:  16863612  539635584  java.lang.String



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3891) Add a class containing all supported Table API types

2016-05-10 Thread Yijie Shen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15279438#comment-15279438
 ] 

Yijie Shen commented on FLINK-3891:
---

I think it will be great to implement this, looking forward the PR :)

> Add a class containing all supported Table API types
> 
>
> Key: FLINK-3891
> URL: https://issues.apache.org/jira/browse/FLINK-3891
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Minor
>
> In order to make working with the Table API easier. It would be great to have 
> a class that contains the supported types.
> Such that an expression could look like:
> {code}
> .select(42.cast(TableType.INT), 43.cast(TableType.DECIMAL))
> {code}
> The constants would map to the original TypeInformation object (in 
> BasicTypeInfo, SqlTimeTypeInfo, etc.).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3852) Use a StreamExecutionEnvironment in the quickstart job skeleton

2016-05-10 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15279400#comment-15279400
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3852:


Hi Mark,

I think replacing the StreamingJob skeleton as the main class doesn't match 
what this issue is aiming for.

I see 2 approaches here:

1. BatchJob skeleton and StreamingJob skeleton within same quickstart module ( 
i.e. Batch & Streaming Java / Batch & Streaming Scala)
In the javadoc within each skeleton, you can guide the user to use 
{noformat}./bin/flink run -c   
{noformat} as instructed in 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/cli.html.
Instead of defining a mainClass in the shade plugin, comment the transformers 
for the mainClass setting out with instructions that the user can add this back 
if they decide to simply use {noformat}./bin/flink run {noformat} with either one of the skeletons as main class entry point.

2. Have 4 separate quickstarts, i.e. Batch Java / Batch Scala / Streaming Java 
/ Streaming Scala.
I like the first approach better, since it also has a natural way of letting 
the user know that there are different submission options for a packaged Flink 
jar job.

> Use a StreamExecutionEnvironment in the quickstart job skeleton
> ---
>
> Key: FLINK-3852
> URL: https://issues.apache.org/jira/browse/FLINK-3852
> Project: Flink
>  Issue Type: Task
>  Components: Quickstarts
>Reporter: Robert Metzger
>  Labels: starter
>
> The Job skeleton created by the maven archetype "quickstart" is still setting 
> up an ExecutionEnvironment, not a StreamExecutionEnvironment.
> These days, most users are using Flink for streaming, so we should reflect 
> that in the quickstart as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3852) Use a StreamExecutionEnvironment in the quickstart job skeleton

2016-05-10 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15279399#comment-15279399
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3852:


Hi Mark,

I think replacing the StreamingJob skeleton as the main class doesn't match 
what this issue is aiming for.

I see 2 approaches here:

1. BatchJob skeleton and StreamingJob skeleton within same quickstart module ( 
i.e. Batch & Streaming Java / Batch & Streaming Scala)
In the javadoc within each skeleton, you can guide the user to use `./bin/flink 
run -c  ` as instructed in 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/cli.html.
Instead on defining a mainClass in the shade plugin, comment the transformers 
for the mainClass setting out with instructions that the user can add this back 
if they decide to simply use `./bin/flink run ` with either one 
of the skeletons as main class entry point.

2. Have 4 separate quickstarts, i.e. Batch Java / Batch Scala / Streaming Java 
/ Streaming Scala.

I like the first approach better, since it also has a natural way of letting 
the user know that there are different submission options for a packaged Flink 
jar job.

> Use a StreamExecutionEnvironment in the quickstart job skeleton
> ---
>
> Key: FLINK-3852
> URL: https://issues.apache.org/jira/browse/FLINK-3852
> Project: Flink
>  Issue Type: Task
>  Components: Quickstarts
>Reporter: Robert Metzger
>  Labels: starter
>
> The Job skeleton created by the maven archetype "quickstart" is still setting 
> up an ExecutionEnvironment, not a StreamExecutionEnvironment.
> These days, most users are using Flink for streaming, so we should reflect 
> that in the quickstart as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (FLINK-3852) Use a StreamExecutionEnvironment in the quickstart job skeleton

2016-05-10 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3852:
---
Comment: was deleted

(was: Hi Mark,

I think replacing the StreamingJob skeleton as the main class doesn't match 
what this issue is aiming for.

I see 2 approaches here:

1. BatchJob skeleton and StreamingJob skeleton within same quickstart module ( 
i.e. Batch & Streaming Java / Batch & Streaming Scala)
In the javadoc within each skeleton, you can guide the user to use `./bin/flink 
run -c  ` as instructed in 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/cli.html.
Instead on defining a mainClass in the shade plugin, comment the transformers 
for the mainClass setting out with instructions that the user can add this back 
if they decide to simply use `./bin/flink run ` with either one 
of the skeletons as main class entry point.

2. Have 4 separate quickstarts, i.e. Batch Java / Batch Scala / Streaming Java 
/ Streaming Scala.

I like the first approach better, since it also has a natural way of letting 
the user know that there are different submission options for a packaged Flink 
jar job.)

> Use a StreamExecutionEnvironment in the quickstart job skeleton
> ---
>
> Key: FLINK-3852
> URL: https://issues.apache.org/jira/browse/FLINK-3852
> Project: Flink
>  Issue Type: Task
>  Components: Quickstarts
>Reporter: Robert Metzger
>  Labels: starter
>
> The Job skeleton created by the maven archetype "quickstart" is still setting 
> up an ExecutionEnvironment, not a StreamExecutionEnvironment.
> These days, most users are using Flink for streaming, so we should reflect 
> that in the quickstart as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Build a flink-connector-kafka-0.9 test-jar

2016-05-10 Thread scosenza
Github user scosenza commented on the pull request:

https://github.com/apache/flink/pull/1972#issuecomment-218318387
  
Hi Robert,

Typically, a test-jar would not contain JUnit test classes unless they are 
designed to be extended. On a related note, it's also good to avoid including 
logback-test.xml inside your test-jars, as this leads to the following warning 
when others (who are also using logback) depend on your test-jar.
```
16:01:09,771 |-INFO in ch.qos.logback.classic.LoggerContext[default] - 
Found resource [logback-test.xml] at 
[file:/Users/.../target/test-classes/logback-test.xml]
16:01:09,772 |-WARN in ch.qos.logback.classic.LoggerContext[default] - 
Resource [logback-test.xml] occurs multiple times on the classpath.
16:01:09,772 |-WARN in ch.qos.logback.classic.LoggerContext[default] - 
Resource [logback-test.xml] occurs at 
[jar:file:/Users/scosenza/.m2/repository/org/apache/flink/flink-connector-kafka-base_2.11/1.0.2/flink-connector-kafka-base_2.11-1.0.2-tests.jar!/logback-test.xml]
16:01:09,772 |-WARN in ch.qos.logback.classic.LoggerContext[default] - 
Resource [logback-test.xml] occurs at 
[jar:file:/Users/scosenza/.m2/repository/org/apache/flink/flink-core/1.0.2/flink-core-1.0.2-tests.jar!/logback-test.xml]
16:01:09,772 |-WARN in ch.qos.logback.classic.LoggerContext[default] - 
Resource [logback-test.xml] occurs at 
[file:/Users/scosenza/.../target/test-classes/logback-test.xml]
16:01:09,772 |-WARN in ch.qos.logback.classic.LoggerContext[default] - 
Resource [logback-test.xml] occurs at 
[jar:file:/Users/scosenza/.m2/repository/org/apache/flink/flink-runtime_2.11/1.0.2/flink-runtime_2.11-1.0.2-tests.jar!/logback-test.xml]
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3852) Use a StreamExecutionEnvironment in the quickstart job skeleton

2016-05-10 Thread Mark Reddy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15279155#comment-15279155
 ] 

Mark Reddy commented on FLINK-3852:
---

I can take this. 

One quick question after taking an initial look. The shade plugin specifies a 
mainClass, if we create a BatchJob and a StreamingJob skeleton would I be 
correct in saying that the main class for the packaged jar would be 
StreamingJob?

If so, for clarity would it be best to remove the following javadoc from the 
BatchJob and only have it in the StreamingJob
{noformat}
 * You can also generate a .jar file that you can submit on your Flink
 * cluster.
 * Just type
 *  mvn clean package
 * in the projects root directory.
 * You will find the jar in
 *  target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
{noformat}

> Use a StreamExecutionEnvironment in the quickstart job skeleton
> ---
>
> Key: FLINK-3852
> URL: https://issues.apache.org/jira/browse/FLINK-3852
> Project: Flink
>  Issue Type: Task
>  Components: Quickstarts
>Reporter: Robert Metzger
>  Labels: starter
>
> The Job skeleton created by the maven archetype "quickstart" is still setting 
> up an ExecutionEnvironment, not a StreamExecutionEnvironment.
> These days, most users are using Flink for streaming, so we should reflect 
> that in the quickstart as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3758) Add possibility to register accumulators in custom triggers

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15279033#comment-15279033
 ] 

ASF GitHub Bot commented on FLINK-3758:
---

Github user knaufk closed the pull request at:

https://github.com/apache/flink/pull/1979


> Add possibility to register accumulators in custom triggers
> ---
>
> Key: FLINK-3758
> URL: https://issues.apache.org/jira/browse/FLINK-3758
> Project: Flink
>  Issue Type: Improvement
>Reporter: Konstantin Knauf
>Assignee: Konstantin Knauf
>Priority: Minor
>
> For monitoring purposes it would be nice to be able to to use accumulators in 
> custom trigger functions. 
> Basically, the trigger context could just expose {{getAccumulator}} of 
> {{RuntimeContext}} or does this create problems I am not aware of?
> Adding accumulators in a trigger function is more difficult, I think, but 
> that's not really neccessary as the accummulator could just be added in some 
> other upstream operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3758] Add possibility to register accum...

2016-05-10 Thread knaufk
Github user knaufk closed the pull request at:

https://github.com/apache/flink/pull/1979


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3758] Add possibility to register accum...

2016-05-10 Thread knaufk
GitHub user knaufk reopened a pull request:

https://github.com/apache/flink/pull/1979

[FLINK-3758] Add possibility to register accumulators in custom triggers

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/knaufk/flink FLINK-3758

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1979.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1979


commit d093fdd6bd22382cde3ae202adfd6d407caac0e8
Author: Konstantin Knauf 
Date:   2016-05-03T20:45:48Z

Added getOrDefaultAccumulator to RuntimeContext

commit 8fc4d39bbb5c76a1b0cc0a4e93522e59fb2135bd
Author: Konstantin Knauf 
Date:   2016-05-03T20:46:17Z

Exposed getOrDefaultAccumulator in WindowContext

commit 08df4fb00a500f8a131b4364f94c264c0b51e6b8
Author: Konstantin Knauf 
Date:   2016-05-05T16:42:10Z

Added simple test for Accumulators in Triggers




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3758) Add possibility to register accumulators in custom triggers

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15279035#comment-15279035
 ] 

ASF GitHub Bot commented on FLINK-3758:
---

GitHub user knaufk reopened a pull request:

https://github.com/apache/flink/pull/1979

[FLINK-3758] Add possibility to register accumulators in custom triggers

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/knaufk/flink FLINK-3758

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1979.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1979


commit d093fdd6bd22382cde3ae202adfd6d407caac0e8
Author: Konstantin Knauf 
Date:   2016-05-03T20:45:48Z

Added getOrDefaultAccumulator to RuntimeContext

commit 8fc4d39bbb5c76a1b0cc0a4e93522e59fb2135bd
Author: Konstantin Knauf 
Date:   2016-05-03T20:46:17Z

Exposed getOrDefaultAccumulator in WindowContext

commit 08df4fb00a500f8a131b4364f94c264c0b51e6b8
Author: Konstantin Knauf 
Date:   2016-05-05T16:42:10Z

Added simple test for Accumulators in Triggers




> Add possibility to register accumulators in custom triggers
> ---
>
> Key: FLINK-3758
> URL: https://issues.apache.org/jira/browse/FLINK-3758
> Project: Flink
>  Issue Type: Improvement
>Reporter: Konstantin Knauf
>Assignee: Konstantin Knauf
>Priority: Minor
>
> For monitoring purposes it would be nice to be able to to use accumulators in 
> custom trigger functions. 
> Basically, the trigger context could just expose {{getAccumulator}} of 
> {{RuntimeContext}} or does this create problems I am not aware of?
> Adding accumulators in a trigger function is more difficult, I think, but 
> that's not really neccessary as the accummulator could just be added in some 
> other upstream operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [Flink 3758] Add possibility to register accum...

2016-05-10 Thread knaufk
GitHub user knaufk opened a pull request:

https://github.com/apache/flink/pull/1979

[Flink 3758] Add possibility to register accumulators in custom triggers
  1 of 1

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/knaufk/flink FLINK-3758

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1979.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1979


commit d093fdd6bd22382cde3ae202adfd6d407caac0e8
Author: Konstantin Knauf 
Date:   2016-05-03T20:45:48Z

Added getOrDefaultAccumulator to RuntimeContext

commit 8fc4d39bbb5c76a1b0cc0a4e93522e59fb2135bd
Author: Konstantin Knauf 
Date:   2016-05-03T20:46:17Z

Exposed getOrDefaultAccumulator in WindowContext

commit 08df4fb00a500f8a131b4364f94c264c0b51e6b8
Author: Konstantin Knauf 
Date:   2016-05-05T16:42:10Z

Added simple test for Accumulators in Triggers




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-3882) Errors in sample Java code for the Elasticsearch 2.x sink

2016-05-10 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-3882.

   Resolution: Fixed
Fix Version/s: 1.1.0

Fixed with 43bd6f6e4020d4102c34c5874ddfa850a281c8c8

> Errors in sample Java code for the Elasticsearch 2.x sink
> -
>
> Key: FLINK-3882
> URL: https://issues.apache.org/jira/browse/FLINK-3882
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Mark Reddy
>Priority: Trivial
> Fix For: 1.1.0
>
>   Original Estimate: 0.5h
>  Remaining Estimate: 0.5h
>
> The sample Java code for the Elasticsearch 2.x sink doc[1] has numerous 
> errors in it, which seems to be resulting from porting the Scala example to 
> Java.
> e.g.
> {code}
>   public IndexRequest createIndexRequest(String element): IndexRequest = {
> Map json = new HashMap<>()
> json.put("data", element)
> {code}
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/elasticsearch2.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3821) Reduce Guava usage in flink-java

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278823#comment-15278823
 ] 

ASF GitHub Bot commented on FLINK-3821:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1938


> Reduce Guava usage in flink-java
> 
>
> Key: FLINK-3821
> URL: https://issues.apache.org/jira/browse/FLINK-3821
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 1.0.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3842] [table] Fix handling null record/...

2016-05-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1974


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-3821) Reduce Guava usage in flink-java

2016-05-10 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-3821.

Resolution: Implemented

Implemented with 15f52112af1e3cca86094b5c6c1ef31157e073df

> Reduce Guava usage in flink-java
> 
>
> Key: FLINK-3821
> URL: https://issues.apache.org/jira/browse/FLINK-3821
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 1.0.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-3842) Fix handling null record/row in generated code

2016-05-10 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-3842.

   Resolution: Fixed
Fix Version/s: 1.1.0

Fixed with 08e80546e20a24bdf204dec485d3ade83cf7804c

> Fix handling null record/row in generated code
> --
>
> Key: FLINK-3842
> URL: https://issues.apache.org/jira/browse/FLINK-3842
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Reporter: Dawid Wysakowicz
>Assignee: Timo Walther
> Fix For: 1.1.0
>
>
> Handling null record/rows is needed to implement features like outer joins.
> Right know an exception is thrown:
> {code}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:807)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.NullPointerException
>   at DataSetJoinRule$54.join(Unknown Source)
>   at 
> org.apache.flink.api.table.runtime.FlatJoinRunner.join(FlatJoinRunner.scala:48)
>   at 
> org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator.callWithNextKey(NonReusingBuildSecondHashJoinIterator.java:152)
>   at 
> org.apache.flink.runtime.operators.AbstractOuterJoinDriver.run(AbstractOuterJoinDriver.java:154)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
>   at java.lang.Thread.run(Thread.java:745)
> java.lang.NullPointerException
>   at DataSetJoinRule$116.join(Unknown Source)
>   at 
> org.apache.flink.api.table.runtime.FlatJoinRunner.join(FlatJoinRunner.scala:48)
>   at 
> org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.executeOnCollections(OuterJoinOperatorBase.java:113)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeBinaryOperator(CollectionExecutor.java:280)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:220)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:146)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:128)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:179)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:155)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:128)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:112)
>   at 
> org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35)
>   at 
> org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47)
>   at 
> org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:42)
>   at 
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
>   at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:546)
>   at 
> org.apache.flink.api.scala.table.test.JoinITCase.testLeftJoinWithMultipleKeys(JoinITCase.scala:291)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at 

[jira] [Commented] (FLINK-3882) Errors in sample Java code for the Elasticsearch 2.x sink

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278822#comment-15278822
 ] 

ASF GitHub Bot commented on FLINK-3882:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1971


> Errors in sample Java code for the Elasticsearch 2.x sink
> -
>
> Key: FLINK-3882
> URL: https://issues.apache.org/jira/browse/FLINK-3882
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Mark Reddy
>Priority: Trivial
>   Original Estimate: 0.5h
>  Remaining Estimate: 0.5h
>
> The sample Java code for the Elasticsearch 2.x sink doc[1] has numerous 
> errors in it, which seems to be resulting from porting the Scala example to 
> Java.
> e.g.
> {code}
>   public IndexRequest createIndexRequest(String element): IndexRequest = {
> Map json = new HashMap<>()
> json.put("data", element)
> {code}
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/elasticsearch2.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3821] Reduce Guava usage in flink-java

2016-05-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1938


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3842) Fix handling null record/row in generated code

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278824#comment-15278824
 ] 

ASF GitHub Bot commented on FLINK-3842:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1974


> Fix handling null record/row in generated code
> --
>
> Key: FLINK-3842
> URL: https://issues.apache.org/jira/browse/FLINK-3842
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Reporter: Dawid Wysakowicz
>Assignee: Timo Walther
>
> Handling null record/rows is needed to implement features like outer joins.
> Right know an exception is thrown:
> {code}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:807)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.NullPointerException
>   at DataSetJoinRule$54.join(Unknown Source)
>   at 
> org.apache.flink.api.table.runtime.FlatJoinRunner.join(FlatJoinRunner.scala:48)
>   at 
> org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator.callWithNextKey(NonReusingBuildSecondHashJoinIterator.java:152)
>   at 
> org.apache.flink.runtime.operators.AbstractOuterJoinDriver.run(AbstractOuterJoinDriver.java:154)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
>   at java.lang.Thread.run(Thread.java:745)
> java.lang.NullPointerException
>   at DataSetJoinRule$116.join(Unknown Source)
>   at 
> org.apache.flink.api.table.runtime.FlatJoinRunner.join(FlatJoinRunner.scala:48)
>   at 
> org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.executeOnCollections(OuterJoinOperatorBase.java:113)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeBinaryOperator(CollectionExecutor.java:280)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:220)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:146)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:128)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:179)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:155)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:128)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:112)
>   at 
> org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35)
>   at 
> org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47)
>   at 
> org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:42)
>   at 
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
>   at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:546)
>   at 
> org.apache.flink.api.scala.table.test.JoinITCase.testLeftJoinWithMultipleKeys(JoinITCase.scala:291)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 

[GitHub] flink pull request: [FLINK-3882][docs] Fix errors in sample Java c...

2016-05-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1971


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278805#comment-15278805
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62745064
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/ScalarFunctionsTest.scala
 ---
@@ -215,7 +215,7 @@ class ScalarFunctionsTest {
 
   }
 
-  @Test
+  @Ignore
--- End diff --

This will be enabled again once type coercion rules are implemented.


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62745064
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/ScalarFunctionsTest.scala
 ---
@@ -215,7 +215,7 @@ class ScalarFunctionsTest {
 
   }
 
-  @Test
+  @Ignore
--- End diff --

This will be enabled again once type coercion rules are implemented.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278792#comment-15278792
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-218278581
  
Hi @fhueske, part of the comments are resolved and updated, two TODOs:

- [ ] Type coercion for expressions, for example:
  - `Add(int, long)` should be auto coerced to `Add(cast(long), long)`
  -  expressions expecting `double` as input but get `int` such as 
`ln(int)` should be auto cast to `ln (cast(double))`

- [ ] `TreeNode`, `FunctionCatalog` etc rewrite.


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread yjshen
Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-218278581
  
Hi @fhueske, part of the comments are resolved and updated, two TODOs:

- [ ] Type coercion for expressions, for example:
  - `Add(int, long)` should be auto coerced to `Add(cast(long), long)`
  -  expressions expecting `double` as input but get `int` such as 
`ln(int)` should be auto cast to `ln (cast(double))`

- [ ] `TreeNode`, `FunctionCatalog` etc rewrite.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278778#comment-15278778
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62742492
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.schema.{Table => CTable}
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.ValidationException
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+val afterResolve = 
super.resolveExpressions(tableEnv).asInstanceOf[Project]
+val newProjectList =
+  afterResolve.projectList.zipWithIndex.map { case (e, i) =>
+e match {
+  case u @ UnresolvedAlias(child, optionalAliasName) => child 
match {
+case ne: NamedExpression => ne
+case e if !e.valid => u
+case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)
+case other => Alias(other, 
optionalAliasName.getOrElse(s"_c$i"))
+  }
+  case _ => throw new IllegalArgumentException
+}
+}
+Project(newProjectList, child)
+  }
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+val allAlias = projectList.forall(_.isInstanceOf[Alias])
+child.toRelNode(relBuilder)
+if (allAlias) {
+  // Calcite's RelBuilder does not translate identity projects even if 
they rename fields.
+  //   Add a projection ourselves (will be automatically removed by 
translation rules).
+  relBuilder.push(
+LogicalProject.create(relBuilder.peek(),
+  projectList.map(_.toRexNode(relBuilder)).asJava,
+  projectList.map(_.name).asJava))
+} else {
+  relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
+}
+  }
+}
+
+case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] =
+throw new UnresolvedException("Invalid call to output on AliasNode")
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder =
+throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+if (aliasList.length > child.output.length) {
+  failValidation("Aliasing more fields than we actually have")
+} else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+  failValidation("`as` only allow string arguments")
+} else {
+  val names = 
aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name)
+  val input = child.output
+  Project(
+names.zip(input).map { case (name, attr) =>
+  Alias(attr, name)} ++ input.drop(names.length), child)
+}
+  }
+}
+
+case class 

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62742492
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.schema.{Table => CTable}
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.ValidationException
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+val afterResolve = 
super.resolveExpressions(tableEnv).asInstanceOf[Project]
+val newProjectList =
+  afterResolve.projectList.zipWithIndex.map { case (e, i) =>
+e match {
+  case u @ UnresolvedAlias(child, optionalAliasName) => child 
match {
+case ne: NamedExpression => ne
+case e if !e.valid => u
+case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)
+case other => Alias(other, 
optionalAliasName.getOrElse(s"_c$i"))
+  }
+  case _ => throw new IllegalArgumentException
+}
+}
+Project(newProjectList, child)
+  }
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+val allAlias = projectList.forall(_.isInstanceOf[Alias])
+child.toRelNode(relBuilder)
+if (allAlias) {
+  // Calcite's RelBuilder does not translate identity projects even if 
they rename fields.
+  //   Add a projection ourselves (will be automatically removed by 
translation rules).
+  relBuilder.push(
+LogicalProject.create(relBuilder.peek(),
+  projectList.map(_.toRexNode(relBuilder)).asJava,
+  projectList.map(_.name).asJava))
+} else {
+  relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
+}
+  }
+}
+
+case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] =
+throw new UnresolvedException("Invalid call to output on AliasNode")
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder =
+throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+if (aliasList.length > child.output.length) {
+  failValidation("Aliasing more fields than we actually have")
+} else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+  failValidation("`as` only allow string arguments")
+} else {
+  val names = 
aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name)
+  val input = child.output
+  Project(
+names.zip(input).map { case (name, attr) =>
+  Alias(attr, name)} ++ input.drop(names.length), child)
+}
+  }
+}
+
+case class Distinct(child: LogicalNode) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+child.toRelNode(relBuilder)
+

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62742390
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.schema.{Table => CTable}
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.ValidationException
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
--- End diff --

Moved


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278777#comment-15278777
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62742390
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.schema.{Table => CTable}
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.ValidationException
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
--- End diff --

Moved


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62731427
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
 ---
@@ -92,8 +93,7 @@ abstract class StreamTableEnvironment(
   def ingest(tableName: String): Table = {
 
 if (isRegistered(tableName)) {
-  relBuilder.scan(tableName)
-  new Table(relBuilder.build(), this)
+  new Table(this, CatalogNode(tableName, getTable(tableName), 
getTypeFactory))
 }
 else {
   throw new TableException(s"Table \'$tableName\' was not found in the 
registry.")
--- End diff --

I think a ValidationException would be more appropriate, because `ingest()` 
belongs to a query IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278682#comment-15278682
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62731427
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
 ---
@@ -92,8 +93,7 @@ abstract class StreamTableEnvironment(
   def ingest(tableName: String): Table = {
 
 if (isRegistered(tableName)) {
-  relBuilder.scan(tableName)
-  new Table(relBuilder.build(), this)
+  new Table(this, CatalogNode(tableName, getTable(tableName), 
getTypeFactory))
 }
 else {
   throw new TableException(s"Table \'$tableName\' was not found in the 
registry.")
--- End diff --

I think a ValidationException would be more appropriate, because `ingest()` 
belongs to a query IMO.


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278516#comment-15278516
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62715643
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.schema.{Table => CTable}
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.ValidationException
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+val afterResolve = 
super.resolveExpressions(tableEnv).asInstanceOf[Project]
+val newProjectList =
+  afterResolve.projectList.zipWithIndex.map { case (e, i) =>
+e match {
+  case u @ UnresolvedAlias(child, optionalAliasName) => child 
match {
+case ne: NamedExpression => ne
+case e if !e.valid => u
+case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)
+case other => Alias(other, 
optionalAliasName.getOrElse(s"_c$i"))
+  }
+  case _ => throw new IllegalArgumentException
+}
+}
+Project(newProjectList, child)
+  }
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+val allAlias = projectList.forall(_.isInstanceOf[Alias])
+child.toRelNode(relBuilder)
+if (allAlias) {
+  // Calcite's RelBuilder does not translate identity projects even if 
they rename fields.
+  //   Add a projection ourselves (will be automatically removed by 
translation rules).
+  relBuilder.push(
+LogicalProject.create(relBuilder.peek(),
+  projectList.map(_.toRexNode(relBuilder)).asJava,
+  projectList.map(_.name).asJava))
+} else {
+  relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
+}
+  }
+}
+
+case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] =
+throw new UnresolvedException("Invalid call to output on AliasNode")
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder =
+throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+if (aliasList.length > child.output.length) {
+  failValidation("Aliasing more fields than we actually have")
+} else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+  failValidation("`as` only allow string arguments")
--- End diff --

  


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62715643
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.schema.{Table => CTable}
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.ValidationException
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+val afterResolve = 
super.resolveExpressions(tableEnv).asInstanceOf[Project]
+val newProjectList =
+  afterResolve.projectList.zipWithIndex.map { case (e, i) =>
+e match {
+  case u @ UnresolvedAlias(child, optionalAliasName) => child 
match {
+case ne: NamedExpression => ne
+case e if !e.valid => u
+case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)
+case other => Alias(other, 
optionalAliasName.getOrElse(s"_c$i"))
+  }
+  case _ => throw new IllegalArgumentException
+}
+}
+Project(newProjectList, child)
+  }
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+val allAlias = projectList.forall(_.isInstanceOf[Alias])
+child.toRelNode(relBuilder)
+if (allAlias) {
+  // Calcite's RelBuilder does not translate identity projects even if 
they rename fields.
+  //   Add a projection ourselves (will be automatically removed by 
translation rules).
+  relBuilder.push(
+LogicalProject.create(relBuilder.peek(),
+  projectList.map(_.toRexNode(relBuilder)).asJava,
+  projectList.map(_.name).asJava))
+} else {
+  relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
+}
+  }
+}
+
+case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] =
+throw new UnresolvedException("Invalid call to output on AliasNode")
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder =
+throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+if (aliasList.length > child.output.length) {
+  failValidation("Aliasing more fields than we actually have")
+} else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+  failValidation("`as` only allow string arguments")
--- End diff --

👍  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278512#comment-15278512
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62715347
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
 ---
@@ -92,8 +93,7 @@ abstract class StreamTableEnvironment(
   def ingest(tableName: String): Table = {
 
 if (isRegistered(tableName)) {
-  relBuilder.scan(tableName)
-  new Table(relBuilder.build(), this)
+  new Table(this, CatalogNode(tableName, getTable(tableName), 
getTypeFactory))
 }
 else {
   throw new TableException(s"Table \'$tableName\' was not found in the 
registry.")
--- End diff --

As we discussed 
https://github.com/apache/flink/pull/1958#discussion_r62680699 , we should keep 
this TableException?


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62715347
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
 ---
@@ -92,8 +93,7 @@ abstract class StreamTableEnvironment(
   def ingest(tableName: String): Table = {
 
 if (isRegistered(tableName)) {
-  relBuilder.scan(tableName)
-  new Table(relBuilder.build(), this)
+  new Table(this, CatalogNode(tableName, getTable(tableName), 
getTypeFactory))
 }
 else {
   throw new TableException(s"Table \'$tableName\' was not found in the 
registry.")
--- End diff --

As we discussed 
https://github.com/apache/flink/pull/1958#discussion_r62680699 , we should keep 
this TableException?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278505#comment-15278505
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62714636
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.schema.{Table => CTable}
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.ValidationException
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+val afterResolve = 
super.resolveExpressions(tableEnv).asInstanceOf[Project]
+val newProjectList =
+  afterResolve.projectList.zipWithIndex.map { case (e, i) =>
+e match {
+  case u @ UnresolvedAlias(child, optionalAliasName) => child 
match {
+case ne: NamedExpression => ne
+case e if !e.valid => u
+case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)
+case other => Alias(other, 
optionalAliasName.getOrElse(s"_c$i"))
+  }
+  case _ => throw new IllegalArgumentException
+}
+}
+Project(newProjectList, child)
+  }
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+val allAlias = projectList.forall(_.isInstanceOf[Alias])
+child.toRelNode(relBuilder)
+if (allAlias) {
+  // Calcite's RelBuilder does not translate identity projects even if 
they rename fields.
+  //   Add a projection ourselves (will be automatically removed by 
translation rules).
+  relBuilder.push(
+LogicalProject.create(relBuilder.peek(),
+  projectList.map(_.toRexNode(relBuilder)).asJava,
+  projectList.map(_.name).asJava))
+} else {
+  relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
+}
+  }
+}
+
+case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] =
+throw new UnresolvedException("Invalid call to output on AliasNode")
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder =
+throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+if (aliasList.length > child.output.length) {
+  failValidation("Aliasing more fields than we actually have")
+} else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+  failValidation("`as` only allow string arguments")
+} else {
+  val names = 
aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name)
+  val input = child.output
+  Project(
+names.zip(input).map { case (name, attr) =>
+  Alias(attr, name)} ++ input.drop(names.length), child)
+}
+  }
+}
+
+case class 

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62714636
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.schema.{Table => CTable}
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.ValidationException
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+val afterResolve = 
super.resolveExpressions(tableEnv).asInstanceOf[Project]
+val newProjectList =
+  afterResolve.projectList.zipWithIndex.map { case (e, i) =>
+e match {
+  case u @ UnresolvedAlias(child, optionalAliasName) => child 
match {
+case ne: NamedExpression => ne
+case e if !e.valid => u
+case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)
+case other => Alias(other, 
optionalAliasName.getOrElse(s"_c$i"))
+  }
+  case _ => throw new IllegalArgumentException
+}
+}
+Project(newProjectList, child)
+  }
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+val allAlias = projectList.forall(_.isInstanceOf[Alias])
+child.toRelNode(relBuilder)
+if (allAlias) {
+  // Calcite's RelBuilder does not translate identity projects even if 
they rename fields.
+  //   Add a projection ourselves (will be automatically removed by 
translation rules).
+  relBuilder.push(
+LogicalProject.create(relBuilder.peek(),
+  projectList.map(_.toRexNode(relBuilder)).asJava,
+  projectList.map(_.name).asJava))
+} else {
+  relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
+}
+  }
+}
+
+case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] =
+throw new UnresolvedException("Invalid call to output on AliasNode")
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder =
+throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+if (aliasList.length > child.output.length) {
+  failValidation("Aliasing more fields than we actually have")
+} else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+  failValidation("`as` only allow string arguments")
+} else {
+  val names = 
aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name)
+  val input = child.output
+  Project(
+names.zip(input).map { case (name, attr) =>
+  Alias(attr, name)} ++ input.drop(names.length), child)
+}
+  }
+}
+
+case class Distinct(child: LogicalNode) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+child.toRelNode(relBuilder)
+

[jira] [Commented] (FLINK-3667) Generalize client<->cluster communication

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278495#comment-15278495
 ] 

ASF GitHub Bot commented on FLINK-3667:
---

GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/1978

[FLINK-3667] refactor client communication

This is mainly an effort to refactor the client side of the cluster 
instantiation and communication. This pull request moves around many things 
which were already in place but, hopefully, makes it easier and more explicit 
for new cluster clients/frameworks to interface.

The main changes: 

Client
- `Client` becomes the abstract base class for client<->cluster 
communication
- It enforces a stricter life cycle for cluster communication via abstract 
methods that clients need to implement
- It shares resources, e.g. ActorSystem with the subclassed clients

Yarn
- All Yarn dependencies have been moved to `flink-yarn`
- The Yarn client (`YarnClusterClient`) has been refactored as subclass of 
the `Client` class
- Yarn specific configuration (`YarnClusterDescriptor`) has been 
implemented as subclass of the `ClusterDescriptor`

CliFrontend
- `CliFrontend` interfaces with custom command-line parsers via the 
`CustomCommandLine` interface
- `CliFrontend` doesn't use any special logic for Yarn

Other
- A few bug fixes regarding the `ApplicationClient` communication

--

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-3667

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1978.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1978






> Generalize client<->cluster communication
> -
>
> Key: FLINK-3667
> URL: https://issues.apache.org/jira/browse/FLINK-3667
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN Client
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>
> Here are some notes I took when inspecting the client<->cluster classes with 
> regard to future integration of other resource management frameworks in 
> addition to Yarn (e.g. Mesos).
> {noformat}
> 1 Cluster Client Abstraction
> 
> 1.1 Status Quo
> ──
> 1.1.1 FlinkYarnClient
> ╌
>   • Holds the cluster configuration (Flink-specific and Yarn-specific)
>   • Contains the deploy() method to deploy the cluster
>   • Creates the Hadoop Yarn client
>   • Receives the initial job manager address
>   • Bootstraps the FlinkYarnCluster
> 1.1.2 FlinkYarnCluster
> ╌╌
>   • Wrapper around the Hadoop Yarn client
>   • Queries cluster for status updates
>   • Life time methods to start and shutdown the cluster
>   • Flink specific features like shutdown after job completion
> 1.1.3 ApplicationClient
> ╌╌╌
>   • Acts as a middle-man for asynchronous cluster communication
>   • Designed to communicate with Yarn, not used in Standalone mode
> 1.1.4 CliFrontend
> ╌
>   • Deeply integrated with FlinkYarnClient and FlinkYarnCluster
>   • Constantly distinguishes between Yarn and Standalone mode
>   • Would be nice to have a general abstraction in place
> 1.1.5 Client
> 
>   • Job submission and Job related actions, agnostic of resource framework
> 1.2 Proposal
> 
> 1.2.1 ClusterConfig (before: AbstractFlinkYarnClient)
> ╌
>   • Extensible cluster-agnostic config
>   • May be extended by specific cluster, e.g. YarnClusterConfig
> 1.2.2 ClusterClient (before: AbstractFlinkYarnClient)
> ╌
>   • Deals with cluster (RM) specific communication
>   • Exposes framework agnostic information
>   • YarnClusterClient, MesosClusterClient, StandaloneClusterClient
> 1.2.3 FlinkCluster (before: AbstractFlinkYarnCluster)
> 

[GitHub] flink pull request: [FLINK-3667] refactor client communication

2016-05-10 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/1978

[FLINK-3667] refactor client communication

This is mainly an effort to refactor the client side of the cluster 
instantiation and communication. This pull request moves around many things 
which were already in place but, hopefully, makes it easier and more explicit 
for new cluster clients/frameworks to interface.

The main changes: 

Client
- `Client` becomes the abstract base class for client<->cluster 
communication
- It enforces a stricter life cycle for cluster communication via abstract 
methods that clients need to implement
- It shares resources, e.g. ActorSystem with the subclassed clients

Yarn
- All Yarn dependencies have been moved to `flink-yarn`
- The Yarn client (`YarnClusterClient`) has been refactored as subclass of 
the `Client` class
- Yarn specific configuration (`YarnClusterDescriptor`) has been 
implemented as subclass of the `ClusterDescriptor`

CliFrontend
- `CliFrontend` interfaces with custom command-line parsers via the 
`CustomCommandLine` interface
- `CliFrontend` doesn't use any special logic for Yarn

Other
- A few bug fixes regarding the `ApplicationClient` communication

--

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-3667

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1978.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1978






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278482#comment-15278482
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62712828
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.schema.{Table => CTable}
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.ValidationException
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+val afterResolve = 
super.resolveExpressions(tableEnv).asInstanceOf[Project]
+val newProjectList =
+  afterResolve.projectList.zipWithIndex.map { case (e, i) =>
+e match {
+  case u @ UnresolvedAlias(child, optionalAliasName) => child 
match {
+case ne: NamedExpression => ne
+case e if !e.valid => u
+case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)
+case other => Alias(other, 
optionalAliasName.getOrElse(s"_c$i"))
+  }
+  case _ => throw new IllegalArgumentException
+}
+}
+Project(newProjectList, child)
+  }
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+val allAlias = projectList.forall(_.isInstanceOf[Alias])
+child.toRelNode(relBuilder)
+if (allAlias) {
+  // Calcite's RelBuilder does not translate identity projects even if 
they rename fields.
+  //   Add a projection ourselves (will be automatically removed by 
translation rules).
+  relBuilder.push(
+LogicalProject.create(relBuilder.peek(),
+  projectList.map(_.toRexNode(relBuilder)).asJava,
+  projectList.map(_.name).asJava))
+} else {
+  relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
+}
+  }
+}
+
+case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] =
+throw new UnresolvedException("Invalid call to output on AliasNode")
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder =
+throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+if (aliasList.length > child.output.length) {
+  failValidation("Aliasing more fields than we actually have")
+} else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+  failValidation("`as` only allow string arguments")
--- End diff --

`Alias only accept name expressions as arguments` ?


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> 

[GitHub] flink pull request: [FLINK-3882][docs] Fix errors in sample Java c...

2016-05-10 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1971#issuecomment-218223297
  
merging this PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62711917
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.trees.TreeNode
+import org.apache.flink.api.table.validate._
+
+/**
+  * LogicalNode is created and validated as we construct query plan using 
Table API.
+  *
+  * The main validation procedure is separated into two phases:
+  * Expressions' resolution and transformation 
(#resolveExpressions(TableEnvironment)):
+  * 
+  *   translate UnresolvedFieldReference into ResolvedFieldReference
+  * using child operator's output
+  *   translate Call(UnresolvedFunction) into solid Expression
+  *   generate alias names for query output
+  *   
+  * 
+  *
+  * LogicalNode validation (#validate(TableEnvironment)):
+  * 
+  *   check no UnresolvedFieldReference exists any more
+  *   check if all expressions have children of needed type
+  *   check each logical operator have desired input
+  * 
+  * Once we pass the validation phase, we can safely convert LogicalNode 
into Calcite's RelNode.
+  *
+  * Note: this is adapted from Apache Spark's LogicalPlan.
+  */
+abstract class LogicalNode extends TreeNode[LogicalNode] {
+  def output: Seq[Attribute]
+
+  def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
+// resolve references and function calls
+transformExpressionsUp {
+  case u @ UnresolvedFieldReference(name) =>
+resolveChildren(name).getOrElse(u)
+  case c @ Call(name, children) if c.childrenValid =>
+tableEnv.getFunctionCatalog.lookupFunction(name, children)
+}
+  }
+
+  def toRelNode(relBuilder: RelBuilder): RelBuilder
--- End diff --

jup. :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3842) Fix handling null record/row in generated code

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278466#comment-15278466
 ] 

ASF GitHub Bot commented on FLINK-3842:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1974#issuecomment-218223867
  
Yes, you are right @dawidwys. 
I'm usually batching several PRs and trigger another build just to be on 
the safe side ;-)


> Fix handling null record/row in generated code
> --
>
> Key: FLINK-3842
> URL: https://issues.apache.org/jira/browse/FLINK-3842
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Reporter: Dawid Wysakowicz
>Assignee: Timo Walther
>
> Handling null record/rows is needed to implement features like outer joins.
> Right know an exception is thrown:
> {code}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:807)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.NullPointerException
>   at DataSetJoinRule$54.join(Unknown Source)
>   at 
> org.apache.flink.api.table.runtime.FlatJoinRunner.join(FlatJoinRunner.scala:48)
>   at 
> org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator.callWithNextKey(NonReusingBuildSecondHashJoinIterator.java:152)
>   at 
> org.apache.flink.runtime.operators.AbstractOuterJoinDriver.run(AbstractOuterJoinDriver.java:154)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
>   at java.lang.Thread.run(Thread.java:745)
> java.lang.NullPointerException
>   at DataSetJoinRule$116.join(Unknown Source)
>   at 
> org.apache.flink.api.table.runtime.FlatJoinRunner.join(FlatJoinRunner.scala:48)
>   at 
> org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.executeOnCollections(OuterJoinOperatorBase.java:113)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeBinaryOperator(CollectionExecutor.java:280)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:220)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:146)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:128)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:179)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:155)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:128)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:112)
>   at 
> org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35)
>   at 
> org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47)
>   at 
> org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:42)
>   at 
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
>   at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:546)
>   at 
> org.apache.flink.api.scala.table.test.JoinITCase.testLeftJoinWithMultipleKeys(JoinITCase.scala:291)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> 

[GitHub] flink pull request: [FLINK-3842] [table] Fix handling null record/...

2016-05-10 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1974#issuecomment-218223867
  
Yes, you are right @dawidwys. 
I'm usually batching several PRs and trigger another build just to be on 
the safe side ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3882) Errors in sample Java code for the Elasticsearch 2.x sink

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278463#comment-15278463
 ] 

ASF GitHub Bot commented on FLINK-3882:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1971#issuecomment-218223297
  
merging this PR


> Errors in sample Java code for the Elasticsearch 2.x sink
> -
>
> Key: FLINK-3882
> URL: https://issues.apache.org/jira/browse/FLINK-3882
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Mark Reddy
>Priority: Trivial
>   Original Estimate: 0.5h
>  Remaining Estimate: 0.5h
>
> The sample Java code for the Elasticsearch 2.x sink doc[1] has numerous 
> errors in it, which seems to be resulting from porting the Scala example to 
> Java.
> e.g.
> {code}
>   public IndexRequest createIndexRequest(String element): IndexRequest = {
> Map json = new HashMap<>()
> json.put("data", element)
> {code}
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/elasticsearch2.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3842) Fix handling null record/row in generated code

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278448#comment-15278448
 ] 

ASF GitHub Bot commented on FLINK-3842:
---

Github user dawidwys commented on the pull request:

https://github.com/apache/flink/pull/1974#issuecomment-218221293
  
I think those failing tests are unrelated.


> Fix handling null record/row in generated code
> --
>
> Key: FLINK-3842
> URL: https://issues.apache.org/jira/browse/FLINK-3842
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Reporter: Dawid Wysakowicz
>Assignee: Timo Walther
>
> Handling null record/rows is needed to implement features like outer joins.
> Right know an exception is thrown:
> {code}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:807)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.NullPointerException
>   at DataSetJoinRule$54.join(Unknown Source)
>   at 
> org.apache.flink.api.table.runtime.FlatJoinRunner.join(FlatJoinRunner.scala:48)
>   at 
> org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator.callWithNextKey(NonReusingBuildSecondHashJoinIterator.java:152)
>   at 
> org.apache.flink.runtime.operators.AbstractOuterJoinDriver.run(AbstractOuterJoinDriver.java:154)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
>   at java.lang.Thread.run(Thread.java:745)
> java.lang.NullPointerException
>   at DataSetJoinRule$116.join(Unknown Source)
>   at 
> org.apache.flink.api.table.runtime.FlatJoinRunner.join(FlatJoinRunner.scala:48)
>   at 
> org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.executeOnCollections(OuterJoinOperatorBase.java:113)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeBinaryOperator(CollectionExecutor.java:280)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:220)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:146)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:128)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:179)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:155)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:128)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:112)
>   at 
> org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35)
>   at 
> org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47)
>   at 
> org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:42)
>   at 
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
>   at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:546)
>   at 
> org.apache.flink.api.scala.table.test.JoinITCase.testLeftJoinWithMultipleKeys(JoinITCase.scala:291)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> 

[GitHub] flink pull request: [FLINK-3842] [table] Fix handling null record/...

2016-05-10 Thread dawidwys
Github user dawidwys commented on the pull request:

https://github.com/apache/flink/pull/1974#issuecomment-218221293
  
I think those failing tests are unrelated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3821] Reduce Guava usage in flink-java

2016-05-10 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1938#issuecomment-218220824
  
Thanks @zentol. Will merge this PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3821) Reduce Guava usage in flink-java

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278445#comment-15278445
 ] 

ASF GitHub Bot commented on FLINK-3821:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1938#issuecomment-218220824
  
Thanks @zentol. Will merge this PR


> Reduce Guava usage in flink-java
> 
>
> Key: FLINK-3821
> URL: https://issues.apache.org/jira/browse/FLINK-3821
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 1.0.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278437#comment-15278437
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62709108
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.trees.TreeNode
+import org.apache.flink.api.table.validate._
+
+/**
+  * LogicalNode is created and validated as we construct query plan using 
Table API.
+  *
+  * The main validation procedure is separated into two phases:
+  * Expressions' resolution and transformation 
(#resolveExpressions(TableEnvironment)):
+  * 
+  *   translate UnresolvedFieldReference into ResolvedFieldReference
+  * using child operator's output
+  *   translate Call(UnresolvedFunction) into solid Expression
+  *   generate alias names for query output
+  *   
+  * 
+  *
+  * LogicalNode validation (#validate(TableEnvironment)):
+  * 
+  *   check no UnresolvedFieldReference exists any more
+  *   check if all expressions have children of needed type
+  *   check each logical operator have desired input
+  * 
+  * Once we pass the validation phase, we can safely convert LogicalNode 
into Calcite's RelNode.
+  *
+  * Note: this is adapted from Apache Spark's LogicalPlan.
+  */
+abstract class LogicalNode extends TreeNode[LogicalNode] {
+  def output: Seq[Attribute]
+
+  def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
+// resolve references and function calls
+transformExpressionsUp {
+  case u @ UnresolvedFieldReference(name) =>
+resolveChildren(name).getOrElse(u)
+  case c @ Call(name, children) if c.childrenValid =>
+tableEnv.getFunctionCatalog.lookupFunction(name, children)
+}
+  }
+
+  def toRelNode(relBuilder: RelBuilder): RelBuilder
--- End diff --

You mean add a
```
final def toRelNode(relBuilder: RelBuilder): RelNode = relBuilder.build()
```
to get rid of `relBuilder.build()`?


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62709108
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.trees.TreeNode
+import org.apache.flink.api.table.validate._
+
+/**
+  * LogicalNode is created and validated as we construct query plan using 
Table API.
+  *
+  * The main validation procedure is separated into two phases:
+  * Expressions' resolution and transformation 
(#resolveExpressions(TableEnvironment)):
+  * 
+  *   translate UnresolvedFieldReference into ResolvedFieldReference
+  * using child operator's output
+  *   translate Call(UnresolvedFunction) into solid Expression
+  *   generate alias names for query output
+  *   
+  * 
+  *
+  * LogicalNode validation (#validate(TableEnvironment)):
+  * 
+  *   check no UnresolvedFieldReference exists any more
+  *   check if all expressions have children of needed type
+  *   check each logical operator have desired input
+  * 
+  * Once we pass the validation phase, we can safely convert LogicalNode 
into Calcite's RelNode.
+  *
+  * Note: this is adapted from Apache Spark's LogicalPlan.
+  */
+abstract class LogicalNode extends TreeNode[LogicalNode] {
+  def output: Seq[Attribute]
+
+  def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
+// resolve references and function calls
+transformExpressionsUp {
+  case u @ UnresolvedFieldReference(name) =>
+resolveChildren(name).getOrElse(u)
+  case c @ Call(name, children) if c.childrenValid =>
+tableEnv.getFunctionCatalog.lookupFunction(name, children)
+}
+  }
+
+  def toRelNode(relBuilder: RelBuilder): RelBuilder
--- End diff --

You mean add a
```
final def toRelNode(relBuilder: RelBuilder): RelNode = relBuilder.build()
```
to get rid of `relBuilder.build()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3768) Clustering Coefficient

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278427#comment-15278427
 ] 

ASF GitHub Bot commented on FLINK-3768:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1896#discussion_r62708062
  
--- Diff: 
flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java
 ---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.translate.LongValueToIntValue;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import java.text.NumberFormat;
+
+public class LocalClusteringCoefficient {
--- End diff --

Done.


> Clustering Coefficient
> --
>
> Key: FLINK-3768
> URL: https://issues.apache.org/jira/browse/FLINK-3768
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> The local clustering coefficient measures the connectedness of each vertex's 
> neighborhood. Values range from 0.0 (no edges between neighbors) to 1.0 
> (neighborhood is a clique).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3842) Fix handling null record/row in generated code

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278430#comment-15278430
 ] 

ASF GitHub Bot commented on FLINK-3842:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1974#issuecomment-218218785
  
Thanks @twalthr, the PR looks good. Will merge it after the tests pass. 


> Fix handling null record/row in generated code
> --
>
> Key: FLINK-3842
> URL: https://issues.apache.org/jira/browse/FLINK-3842
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Reporter: Dawid Wysakowicz
>Assignee: Timo Walther
>
> Handling null record/rows is needed to implement features like outer joins.
> Right know an exception is thrown:
> {code}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:807)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.NullPointerException
>   at DataSetJoinRule$54.join(Unknown Source)
>   at 
> org.apache.flink.api.table.runtime.FlatJoinRunner.join(FlatJoinRunner.scala:48)
>   at 
> org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator.callWithNextKey(NonReusingBuildSecondHashJoinIterator.java:152)
>   at 
> org.apache.flink.runtime.operators.AbstractOuterJoinDriver.run(AbstractOuterJoinDriver.java:154)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
>   at java.lang.Thread.run(Thread.java:745)
> java.lang.NullPointerException
>   at DataSetJoinRule$116.join(Unknown Source)
>   at 
> org.apache.flink.api.table.runtime.FlatJoinRunner.join(FlatJoinRunner.scala:48)
>   at 
> org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.executeOnCollections(OuterJoinOperatorBase.java:113)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeBinaryOperator(CollectionExecutor.java:280)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:220)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:146)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:128)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:179)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:155)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:128)
>   at 
> org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:112)
>   at 
> org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35)
>   at 
> org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47)
>   at 
> org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:42)
>   at 
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
>   at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:546)
>   at 
> org.apache.flink.api.scala.table.test.JoinITCase.testLeftJoinWithMultipleKeys(JoinITCase.scala:291)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> 

[GitHub] flink pull request: [FLINK-3842] [table] Fix handling null record/...

2016-05-10 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1974#issuecomment-218218785
  
Thanks @twalthr, the PR looks good. Will merge it after the tests pass. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3768] [gelly] Clustering Coefficient

2016-05-10 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1896#discussion_r62708062
  
--- Diff: 
flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java
 ---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.translate.LongValueToIntValue;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import java.text.NumberFormat;
+
+public class LocalClusteringCoefficient {
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62708060
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.trees.TreeNode
+import org.apache.flink.api.table.validate._
+
+/**
+  * LogicalNode is created and validated as we construct query plan using 
Table API.
+  *
+  * The main validation procedure is separated into two phases:
+  * Expressions' resolution and transformation 
(#resolveExpressions(TableEnvironment)):
+  * 
+  *   translate UnresolvedFieldReference into ResolvedFieldReference
+  * using child operator's output
+  *   translate Call(UnresolvedFunction) into solid Expression
+  *   generate alias names for query output
+  *   
+  * 
+  *
+  * LogicalNode validation (#validate(TableEnvironment)):
+  * 
+  *   check no UnresolvedFieldReference exists any more
+  *   check if all expressions have children of needed type
+  *   check each logical operator have desired input
+  * 
+  * Once we pass the validation phase, we can safely convert LogicalNode 
into Calcite's RelNode.
+  *
+  * Note: this is adapted from Apache Spark's LogicalPlan.
+  */
+abstract class LogicalNode extends TreeNode[LogicalNode] {
+  def output: Seq[Attribute]
+
+  def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
+// resolve references and function calls
+transformExpressionsUp {
+  case u @ UnresolvedFieldReference(name) =>
+resolveChildren(name).getOrElse(u)
+  case c @ Call(name, children) if c.childrenValid =>
+tableEnv.getFunctionCatalog.lookupFunction(name, children)
+}
+  }
+
+  def toRelNode(relBuilder: RelBuilder): RelBuilder
--- End diff --

Yes, renaming the method would be good. I thought, we could also add a 
`toRelNode()` method which actually returns a `RelNode` in addition to the 
renamed method. The renamed method would then only be required for subclasses 
of `LogicalNode` and other classes would not need to call `build()` on the 
`RelBuilder` but receive a finalized `RelNode`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278426#comment-15278426
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62708060
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.trees.TreeNode
+import org.apache.flink.api.table.validate._
+
+/**
+  * LogicalNode is created and validated as we construct query plan using 
Table API.
+  *
+  * The main validation procedure is separated into two phases:
+  * Expressions' resolution and transformation 
(#resolveExpressions(TableEnvironment)):
+  * 
+  *   translate UnresolvedFieldReference into ResolvedFieldReference
+  * using child operator's output
+  *   translate Call(UnresolvedFunction) into solid Expression
+  *   generate alias names for query output
+  *   
+  * 
+  *
+  * LogicalNode validation (#validate(TableEnvironment)):
+  * 
+  *   check no UnresolvedFieldReference exists any more
+  *   check if all expressions have children of needed type
+  *   check each logical operator have desired input
+  * 
+  * Once we pass the validation phase, we can safely convert LogicalNode 
into Calcite's RelNode.
+  *
+  * Note: this is adapted from Apache Spark's LogicalPlan.
+  */
+abstract class LogicalNode extends TreeNode[LogicalNode] {
+  def output: Seq[Attribute]
+
+  def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
+// resolve references and function calls
+transformExpressionsUp {
+  case u @ UnresolvedFieldReference(name) =>
+resolveChildren(name).getOrElse(u)
+  case c @ Call(name, children) if c.childrenValid =>
+tableEnv.getFunctionCatalog.lookupFunction(name, children)
+}
+  }
+
+  def toRelNode(relBuilder: RelBuilder): RelBuilder
--- End diff --

Yes, renaming the method would be good. I thought, we could also add a 
`toRelNode()` method which actually returns a `RelNode` in addition to the 
renamed method. The renamed method would then only be required for subclasses 
of `LogicalNode` and other classes would not need to call `build()` on the 
`RelBuilder` but receive a finalized `RelNode`. 


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278407#comment-15278407
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62705691
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.trees.TreeNode
+import org.apache.flink.api.table.validate._
+
+/**
+  * LogicalNode is created and validated as we construct query plan using 
Table API.
+  *
+  * The main validation procedure is separated into two phases:
+  * Expressions' resolution and transformation 
(#resolveExpressions(TableEnvironment)):
+  * 
+  *   translate UnresolvedFieldReference into ResolvedFieldReference
+  * using child operator's output
+  *   translate Call(UnresolvedFunction) into solid Expression
+  *   generate alias names for query output
+  *   
+  * 
+  *
+  * LogicalNode validation (#validate(TableEnvironment)):
+  * 
+  *   check no UnresolvedFieldReference exists any more
+  *   check if all expressions have children of needed type
+  *   check each logical operator have desired input
+  * 
+  * Once we pass the validation phase, we can safely convert LogicalNode 
into Calcite's RelNode.
+  *
+  * Note: this is adapted from Apache Spark's LogicalPlan.
+  */
+abstract class LogicalNode extends TreeNode[LogicalNode] {
+  def output: Seq[Attribute]
+
+  def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
+// resolve references and function calls
+transformExpressionsUp {
+  case u @ UnresolvedFieldReference(name) =>
+resolveChildren(name).getOrElse(u)
+  case c @ Call(name, children) if c.childrenValid =>
+tableEnv.getFunctionCatalog.lookupFunction(name, children)
+}
+  }
+
+  def toRelNode(relBuilder: RelBuilder): RelBuilder
--- End diff --

`toRelNode` I am using here may not be a proper name, this function is 
manipulating the argument `relBuilder`'s inner stack using Calcite API:  
`relBuilder.join`, `relBuilder.distinct` etc, and returning the `relBuilder` 
with a changed stack, much like the `StringBuilder`'s `append` method. 
Maybe the I can just rename it to `construct` or `apply`?


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62705691
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.trees.TreeNode
+import org.apache.flink.api.table.validate._
+
+/**
+  * LogicalNode is created and validated as we construct query plan using 
Table API.
+  *
+  * The main validation procedure is separated into two phases:
+  * Expressions' resolution and transformation 
(#resolveExpressions(TableEnvironment)):
+  * 
+  *   translate UnresolvedFieldReference into ResolvedFieldReference
+  * using child operator's output
+  *   translate Call(UnresolvedFunction) into solid Expression
+  *   generate alias names for query output
+  *   
+  * 
+  *
+  * LogicalNode validation (#validate(TableEnvironment)):
+  * 
+  *   check no UnresolvedFieldReference exists any more
+  *   check if all expressions have children of needed type
+  *   check each logical operator have desired input
+  * 
+  * Once we pass the validation phase, we can safely convert LogicalNode 
into Calcite's RelNode.
+  *
+  * Note: this is adapted from Apache Spark's LogicalPlan.
+  */
+abstract class LogicalNode extends TreeNode[LogicalNode] {
+  def output: Seq[Attribute]
+
+  def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
+// resolve references and function calls
+transformExpressionsUp {
+  case u @ UnresolvedFieldReference(name) =>
+resolveChildren(name).getOrElse(u)
+  case c @ Call(name, children) if c.childrenValid =>
+tableEnv.getFunctionCatalog.lookupFunction(name, children)
+}
+  }
+
+  def toRelNode(relBuilder: RelBuilder): RelBuilder
--- End diff --

`toRelNode` I am using here may not be a proper name, this function is 
manipulating the argument `relBuilder`'s inner stack using Calcite API:  
`relBuilder.join`, `relBuilder.distinct` etc, and returning the `relBuilder` 
with a changed stack, much like the `StringBuilder`'s `append` method. 
Maybe the I can just rename it to `construct` or `apply`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3768) Clustering Coefficient

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278391#comment-15278391
 ] 

ASF GitHub Bot commented on FLINK-3768:
---

Github user hsaputra commented on a diff in the pull request:

https://github.com/apache/flink/pull/1896#discussion_r62704246
  
--- Diff: 
flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java
 ---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.translate.LongValueToIntValue;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import java.text.NumberFormat;
+
+public class LocalClusteringCoefficient {
--- End diff --

I know this is just an example, but would be nice to add Javadoc 
information to give high level information why this class exists.


> Clustering Coefficient
> --
>
> Key: FLINK-3768
> URL: https://issues.apache.org/jira/browse/FLINK-3768
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> The local clustering coefficient measures the connectedness of each vertex's 
> neighborhood. Values range from 0.0 (no edges between neighbors) to 1.0 
> (neighborhood is a clique).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3768] [gelly] Clustering Coefficient

2016-05-10 Thread hsaputra
Github user hsaputra commented on a diff in the pull request:

https://github.com/apache/flink/pull/1896#discussion_r62704246
  
--- Diff: 
flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java
 ---
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.translate.LongValueToIntValue;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import java.text.NumberFormat;
+
+public class LocalClusteringCoefficient {
--- End diff --

I know this is just an example, but would be nice to add Javadoc 
information to give high level information why this class exists.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3856) Create types for java.sql.Date/Time/Timestamp

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278371#comment-15278371
 ] 

ASF GitHub Bot commented on FLINK-3856:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1959#discussion_r62702515
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/SqlTimeTypeInfo.java
 ---
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import java.lang.reflect.Constructor;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Objects;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DateComparator;
+import org.apache.flink.api.common.typeutils.base.SqlDateSerializer;
+import org.apache.flink.api.common.typeutils.base.SqlTimeSerializer;
+import org.apache.flink.api.common.typeutils.base.SqlTimestampComparator;
+import org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Type information for Java SQL Date/Time/Timestamp.
+ */
+@PublicEvolving
+public class SqlTimeTypeInfo extends TypeInformation implements 
AtomicType {
--- End diff --

Do we want to implement a new `TypeInformation` which is basically a copy 
of `BasicTypeInfo`? 

Alternatively we could make `SqlTimeTypeInfo` just a holder for three 
`public static final BasicTypeInfo`s of the three time types. The `protected` 
constructor of `BasicTypeInfo` is visible if `SqlTimeTypeInfo` is in the same 
package as `BasicTypeInfo`. 


> Create types for java.sql.Date/Time/Timestamp
> -
>
> Key: FLINK-3856
> URL: https://issues.apache.org/jira/browse/FLINK-3856
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> At the moment there is only the {{Date}} type which is not sufficient for 
> most use cases about time.
> The Table API would also benefit from having different types as output result.
> I would propose to add the three {{java.sql.}} types either as {{BasicTypes}} 
> or in an additional class {{TimeTypes}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278385#comment-15278385
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62703535
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -152,13 +160,13 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *
 * @param name The name under which the table is registered.
 * @param table The table to register in the catalog
-* @throws TableException if another table is registered under the 
provided name.
+* @throws ValidationException if another table is registered under the 
provided name.
 */
-  @throws[TableException]
+  @throws[ValidationException]
   protected def registerTableInternal(name: String, table: AbstractTable): 
Unit = {
 
 if (isRegistered(name)) {
-  throw new TableException(s"Table \'$name\' already exists. " +
+  throw new ValidationException(s"Table \'$name\' already exists. " +
--- End diff --

Yes, that's a good point


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62703535
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -152,13 +160,13 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *
 * @param name The name under which the table is registered.
 * @param table The table to register in the catalog
-* @throws TableException if another table is registered under the 
provided name.
+* @throws ValidationException if another table is registered under the 
provided name.
 */
-  @throws[TableException]
+  @throws[ValidationException]
   protected def registerTableInternal(name: String, table: AbstractTable): 
Unit = {
 
 if (isRegistered(name)) {
-  throw new TableException(s"Table \'$name\' already exists. " +
+  throw new ValidationException(s"Table \'$name\' already exists. " +
--- End diff --

Yes, that's a good point


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3768) Clustering Coefficient

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278380#comment-15278380
 ] 

ASF GitHub Bot commented on FLINK-3768:
---

Github user hsaputra commented on the pull request:

https://github.com/apache/flink/pull/1896#issuecomment-218210466
  
Please address my comments about adding Javadoc header info to ALL new Java 
class added to source repo. Thx!


> Clustering Coefficient
> --
>
> Key: FLINK-3768
> URL: https://issues.apache.org/jira/browse/FLINK-3768
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> The local clustering coefficient measures the connectedness of each vertex's 
> neighborhood. Values range from 0.0 (no edges between neighbors) to 1.0 
> (neighborhood is a clique).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3768] [gelly] Clustering Coefficient

2016-05-10 Thread hsaputra
Github user hsaputra commented on the pull request:

https://github.com/apache/flink/pull/1896#issuecomment-218210466
  
Please address my comments about adding Javadoc header info to ALL new Java 
class added to source repo. Thx!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278377#comment-15278377
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62703065
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -152,13 +160,13 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *
 * @param name The name under which the table is registered.
 * @param table The table to register in the catalog
-* @throws TableException if another table is registered under the 
provided name.
+* @throws ValidationException if another table is registered under the 
provided name.
 */
-  @throws[TableException]
+  @throws[ValidationException]
   protected def registerTableInternal(name: String, table: AbstractTable): 
Unit = {
 
 if (isRegistered(name)) {
-  throw new TableException(s"Table \'$name\' already exists. " +
+  throw new ValidationException(s"Table \'$name\' already exists. " +
--- End diff --

It makes sense to me.
In this way, unsupported operations for StreamTableEnvironment should 
throws `TableException` ?


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3856) Create types for java.sql.Date/Time/Timestamp

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278378#comment-15278378
 ] 

ASF GitHub Bot commented on FLINK-3856:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1959#issuecomment-218210159
  
@twalthr, should `SqlTimeTypeInfo` implement `TypeInformation`? What do you 
think?


> Create types for java.sql.Date/Time/Timestamp
> -
>
> Key: FLINK-3856
> URL: https://issues.apache.org/jira/browse/FLINK-3856
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> At the moment there is only the {{Date}} type which is not sufficient for 
> most use cases about time.
> The Table API would also benefit from having different types as output result.
> I would propose to add the three {{java.sql.}} types either as {{BasicTypes}} 
> or in an additional class {{TimeTypes}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3856] [core] Create types for java.sql....

2016-05-10 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1959#issuecomment-218210159
  
@twalthr, should `SqlTimeTypeInfo` implement `TypeInformation`? What do you 
think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62703065
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -152,13 +160,13 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *
 * @param name The name under which the table is registered.
 * @param table The table to register in the catalog
-* @throws TableException if another table is registered under the 
provided name.
+* @throws ValidationException if another table is registered under the 
provided name.
 */
-  @throws[TableException]
+  @throws[ValidationException]
   protected def registerTableInternal(name: String, table: AbstractTable): 
Unit = {
 
 if (isRegistered(name)) {
-  throw new TableException(s"Table \'$name\' already exists. " +
+  throw new ValidationException(s"Table \'$name\' already exists. " +
--- End diff --

It makes sense to me.
In this way, unsupported operations for StreamTableEnvironment should 
throws `TableException` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3856] [core] Create types for java.sql....

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1959#discussion_r62702515
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/SqlTimeTypeInfo.java
 ---
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import java.lang.reflect.Constructor;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Objects;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DateComparator;
+import org.apache.flink.api.common.typeutils.base.SqlDateSerializer;
+import org.apache.flink.api.common.typeutils.base.SqlTimeSerializer;
+import org.apache.flink.api.common.typeutils.base.SqlTimestampComparator;
+import org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Type information for Java SQL Date/Time/Timestamp.
+ */
+@PublicEvolving
+public class SqlTimeTypeInfo extends TypeInformation implements 
AtomicType {
--- End diff --

Do we want to implement a new `TypeInformation` which is basically a copy 
of `BasicTypeInfo`? 

Alternatively we could make `SqlTimeTypeInfo` just a holder for three 
`public static final BasicTypeInfo`s of the three time types. The `protected` 
constructor of `BasicTypeInfo` is visible if `SqlTimeTypeInfo` is in the same 
package as `BasicTypeInfo`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-3896) Allow a StreamTask to be Externally Cancelled.

2016-05-10 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-3896:
--
Description: 
When implementing a custom operator, it may be useful to be able to cancel the 
task it runs into due to an external event. As an example imagine an Operator 
that spawns a thread to do some work and forward the element to the next one. 
In this case, if an exception is thrown by that thread, then this will not be 
automatically propagated to the main thread, in order to cancel the task. In 
this case, it would be useful to cancel the task by that thread. This issue 
aims at adding exactly 
this functionality to the StreamTask. Currently a Task can do so, but this is 
not accessible to the StreamTask.

> Allow a StreamTask to be Externally Cancelled.
> --
>
> Key: FLINK-3896
> URL: https://issues.apache.org/jira/browse/FLINK-3896
> Project: Flink
>  Issue Type: Bug
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>
> When implementing a custom operator, it may be useful to be able to cancel 
> the task it runs into due to an external event. As an example imagine an 
> Operator that spawns a thread to do some work and forward the element to the 
> next one. 
> In this case, if an exception is thrown by that thread, then this will not be 
> automatically propagated to the main thread, in order to cancel the task. In 
> this case, it would be useful to cancel the task by that thread. This issue 
> aims at adding exactly 
> this functionality to the StreamTask. Currently a Task can do so, but this is 
> not accessible to the StreamTask.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3896) Allow a StreamTask to be Externally Cancelled.

2016-05-10 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-3896:
-

 Summary: Allow a StreamTask to be Externally Cancelled.
 Key: FLINK-3896
 URL: https://issues.apache.org/jira/browse/FLINK-3896
 Project: Flink
  Issue Type: Bug
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3768) Clustering Coefficient

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278278#comment-15278278
 ] 

ASF GitHub Bot commented on FLINK-3768:
---

Github user greghogan commented on the pull request:

https://github.com/apache/flink/pull/1896#issuecomment-218194878
  
I have rebased this PR against the newly committed dependent features so it 
should be good for discussion.


> Clustering Coefficient
> --
>
> Key: FLINK-3768
> URL: https://issues.apache.org/jira/browse/FLINK-3768
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> The local clustering coefficient measures the connectedness of each vertex's 
> neighborhood. Values range from 0.0 (no edges between neighbors) to 1.0 
> (neighborhood is a clique).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3895) Remove Docs for Program Packaging via Plans

2016-05-10 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-3895:
-
Summary: Remove Docs for Program Packaging via Plans  (was: Remove Focs for 
Program Packaging via Plans)

> Remove Docs for Program Packaging via Plans
> ---
>
> Key: FLINK-3895
> URL: https://issues.apache.org/jira/browse/FLINK-3895
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Affects Versions: 1.0.2
>Reporter: Stephan Ewen
> Fix For: 1.1.0
>
>
> As a first step, we should remove the docs that describe packaging via plans, 
> in order to avoid confusion among users.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3768] [gelly] Clustering Coefficient

2016-05-10 Thread greghogan
Github user greghogan commented on the pull request:

https://github.com/apache/flink/pull/1896#issuecomment-218194878
  
I have rebased this PR against the newly committed dependent features so it 
should be good for discussion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3826) Broken test: StreamCheckpointingITCase

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278261#comment-15278261
 ] 

ASF GitHub Bot commented on FLINK-3826:
---

Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1977#issuecomment-218191358
  
@kl0u Could you stop posting a comment in every PR right after you opened 
it? You can add whatever you want to say to the main post.

could we now remove the hasFailed flag? as i see it this will probably just 
lead to multiple parallel instances failing with an exception causing a chaotic 
stack trace.


> Broken test: StreamCheckpointingITCase
> --
>
> Key: FLINK-3826
> URL: https://issues.apache.org/jira/browse/FLINK-3826
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>
> The purpose of the StreamCheckpointingITCase is to run a job, throw an 
> exception to trigger a task failure, and test the recovery process. In the 
> test, the line (270), which is throwing the exception to cause the task 
> failure, is commented out.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3826] Broken test: StreamCheckpointingI...

2016-05-10 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1977#issuecomment-218191358
  
@kl0u Could you stop posting a comment in every PR right after you opened 
it? You can add whatever you want to say to the main post.

could we now remove the hasFailed flag? as i see it this will probably just 
lead to multiple parallel instances failing with an exception causing a chaotic 
stack trace.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3893) LeaderChangeStateCleanupTest times out

2016-05-10 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278245#comment-15278245
 ] 

Maximilian Michels commented on FLINK-3893:
---

After fixing this, I get the following:

{noformat}
java.lang.IllegalStateException: The retrieval service has not been started 
properly.
at 
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService.notifyListener(TestingLeaderRetrievalService.java:65)
at 
org.apache.flink.runtime.leaderelection.LeaderElectionRetrievalTestingCluster.notifyRetrievalListeners(LeaderElectionRetrievalTestingCluster.java:123)
at 
org.apache.flink.runtime.leaderelection.LeaderChangeStateCleanupTest.testStateCleanupAfterNewLeaderElectionAndListenerNotification(LeaderChangeStateCleanupTest.java:97)
{noformat}

The {{LeaderRetrievalService}} may not been started yet (because the Actors are 
started asynchronously). I propose to remove this check.

> LeaderChangeStateCleanupTest times out
> --
>
> Key: FLINK-3893
> URL: https://issues.apache.org/jira/browse/FLINK-3893
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
>  Labels: test-stability
>
> {{cluster.waitForTaskManagersToBeRegistered();}} needs to be replaced by 
> {{cluster.waitForTaskManagersToBeRegistered(timeout);}}
> {noformat}
> testStateCleanupAfterListenerNotification(org.apache.flink.runtime.leaderelection.LeaderChangeStateCleanupTest)
>   Time elapsed: 10.106 sec  <<< ERROR!
> java.util.concurrent.TimeoutException: Futures timed out after [1 
> milliseconds]
>   at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>   at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153)
>   at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:86)
>   at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:86)
>   at 
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>   at scala.concurrent.Await$.ready(package.scala:86)
>   at 
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.waitForTaskManagersToBeRegistered(FlinkMiniCluster.scala:455)
>   at 
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.waitForTaskManagersToBeRegistered(FlinkMiniCluster.scala:439)
>   at 
> org.apache.flink.runtime.leaderelection.LeaderChangeStateCleanupTest.testStateCleanupAfterListenerNotification(LeaderChangeStateCleanupTest.java:181)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278209#comment-15278209
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62685356
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.validate
+
+import scala.reflect.ClassTag
+import scala.util.{Failure, Success, Try}
+
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.validate.FunctionCatalog.FunctionBuilder
+
+/**
+  * A catalog for looking up user defined functions, used by an Analyzer.
+  *
+  * Note: this is adapted from Apache Spark's FunctionRegistry.
+  */
+trait FunctionCatalog {
--- End diff --

Do we need this class except for some of the tests?


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3895) Remove Focs for Program Packaging via Plans

2016-05-10 Thread Ken Krugler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278231#comment-15278231
 ] 

Ken Krugler commented on FLINK-3895:


Maybe "Remove Docs for Program..."? :)


> Remove Focs for Program Packaging via Plans
> ---
>
> Key: FLINK-3895
> URL: https://issues.apache.org/jira/browse/FLINK-3895
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Affects Versions: 1.0.2
>Reporter: Stephan Ewen
> Fix For: 1.1.0
>
>
> As a first step, we should remove the docs that describe packaging via plans, 
> in order to avoid confusion among users.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62685650
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/exceptions.scala
 ---
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.validate
--- End diff --

Can we move this to `org.apache.flink.api.table` and also add the 
`TableException` and `ExpressionParserException`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62680102
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -197,6 +209,17 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 planner
   }
 
+  /**
+* Returns the Calcite 
[[org.apache.calcite.rel.`type`.RelDataTypeFactory]]
+* of this TableEnvironment. */
+  private[flink] def getTypeFactory: RelDataTypeFactory = {
--- End diff --

`getTypeFactory()` and `getTable()` are only called to create a 
`CatalogNode` which internally obtains the `rowType: RelDataType` of the table. 
I think we can simply return the `rowType` for a given table name and remove 
`getTypeFactory()` and `getTable()` from this class. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278205#comment-15278205
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62684793
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -472,61 +339,39 @@ class Table(
 tableEnv.emitToSink(this, configuredSink)
   }
 
-  private def createRenamingProject(exprs: Seq[RexNode]): LogicalProject = 
{
-
-val names = exprs.map{ e =>
-  e.getKind match {
-case SqlKind.AS =>
-  e.asInstanceOf[RexCall].getOperands.get(1)
-.asInstanceOf[RexLiteral].getValue
-.asInstanceOf[NlsString].getValue
-case SqlKind.INPUT_REF =>
-  
relNode.getRowType.getFieldNames.get(e.asInstanceOf[RexInputRef].getIndex)
-case _ =>
-  throw new PlanGenException("Unexpected expression type 
encountered.")
-  }
-
-}
-LogicalProject.create(relNode, exprs.toList.asJava, 
names.toList.asJava)
-  }
-
   private def checkUniqueNames(exprs: Seq[Expression]): Unit = {
--- End diff --

Can we move this check into `Project`?


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-218182019
  
Hi @yjshen, sorry for the time it took to review the PR. I like the 
approach and the PR looks good, IMO. I had a few minor comments and questions. 
Let me know what you think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278214#comment-15278214
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-218182019
  
Hi @yjshen, sorry for the time it took to review the PR. I like the 
approach and the PR looks good, IMO. I had a few minor comments and questions. 
Let me know what you think.


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278204#comment-15278204
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62684644
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.schema.{Table => CTable}
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.ValidationException
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+val afterResolve = 
super.resolveExpressions(tableEnv).asInstanceOf[Project]
+val newProjectList =
+  afterResolve.projectList.zipWithIndex.map { case (e, i) =>
+e match {
+  case u @ UnresolvedAlias(child, optionalAliasName) => child 
match {
+case ne: NamedExpression => ne
+case e if !e.valid => u
+case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)
+case other => Alias(other, 
optionalAliasName.getOrElse(s"_c$i"))
+  }
+  case _ => throw new IllegalArgumentException
+}
+}
+Project(newProjectList, child)
+  }
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+val allAlias = projectList.forall(_.isInstanceOf[Alias])
+child.toRelNode(relBuilder)
+if (allAlias) {
+  // Calcite's RelBuilder does not translate identity projects even if 
they rename fields.
+  //   Add a projection ourselves (will be automatically removed by 
translation rules).
+  relBuilder.push(
+LogicalProject.create(relBuilder.peek(),
+  projectList.map(_.toRexNode(relBuilder)).asJava,
+  projectList.map(_.name).asJava))
+} else {
+  relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
+}
+  }
+}
+
+case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] =
+throw new UnresolvedException("Invalid call to output on AliasNode")
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder =
+throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+if (aliasList.length > child.output.length) {
+  failValidation("Aliasing more fields than we actually have")
+} else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+  failValidation("`as` only allow string arguments")
+} else {
+  val names = 
aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name)
+  val input = child.output
+  Project(
+names.zip(input).map { case (name, attr) =>
+  Alias(attr, name)} ++ input.drop(names.length), child)
+}
+  }
+}
+
+case class 

[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278212#comment-15278212
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62685650
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/exceptions.scala
 ---
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.validate
--- End diff --

Can we move this to `org.apache.flink.api.table` and also add the 
`TableException` and `ExpressionParserException`?


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278199#comment-15278199
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62684054
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.schema.{Table => CTable}
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.ValidationException
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+val afterResolve = 
super.resolveExpressions(tableEnv).asInstanceOf[Project]
+val newProjectList =
+  afterResolve.projectList.zipWithIndex.map { case (e, i) =>
+e match {
+  case u @ UnresolvedAlias(child, optionalAliasName) => child 
match {
+case ne: NamedExpression => ne
+case e if !e.valid => u
+case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)
+case other => Alias(other, 
optionalAliasName.getOrElse(s"_c$i"))
+  }
+  case _ => throw new IllegalArgumentException
+}
+}
+Project(newProjectList, child)
+  }
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+val allAlias = projectList.forall(_.isInstanceOf[Alias])
+child.toRelNode(relBuilder)
+if (allAlias) {
+  // Calcite's RelBuilder does not translate identity projects even if 
they rename fields.
+  //   Add a projection ourselves (will be automatically removed by 
translation rules).
+  relBuilder.push(
+LogicalProject.create(relBuilder.peek(),
+  projectList.map(_.toRexNode(relBuilder)).asJava,
+  projectList.map(_.name).asJava))
+} else {
+  relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
+}
+  }
+}
+
+case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] =
+throw new UnresolvedException("Invalid call to output on AliasNode")
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder =
+throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+if (aliasList.length > child.output.length) {
+  failValidation("Aliasing more fields than we actually have")
+} else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+  failValidation("`as` only allow string arguments")
+} else {
+  val names = 
aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name)
+  val input = child.output
+  Project(
+names.zip(input).map { case (name, attr) =>
+  Alias(attr, name)} ++ input.drop(names.length), child)
+}
+  }
+}
+
+case class 

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62683790
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.schema.{Table => CTable}
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.ValidationException
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+val afterResolve = 
super.resolveExpressions(tableEnv).asInstanceOf[Project]
+val newProjectList =
+  afterResolve.projectList.zipWithIndex.map { case (e, i) =>
+e match {
+  case u @ UnresolvedAlias(child, optionalAliasName) => child 
match {
+case ne: NamedExpression => ne
+case e if !e.valid => u
+case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)
+case other => Alias(other, 
optionalAliasName.getOrElse(s"_c$i"))
+  }
+  case _ => throw new IllegalArgumentException
+}
+}
+Project(newProjectList, child)
+  }
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+val allAlias = projectList.forall(_.isInstanceOf[Alias])
+child.toRelNode(relBuilder)
+if (allAlias) {
+  // Calcite's RelBuilder does not translate identity projects even if 
they rename fields.
+  //   Add a projection ourselves (will be automatically removed by 
translation rules).
+  relBuilder.push(
+LogicalProject.create(relBuilder.peek(),
+  projectList.map(_.toRexNode(relBuilder)).asJava,
+  projectList.map(_.name).asJava))
+} else {
+  relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
+}
+  }
+}
+
+case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] =
+throw new UnresolvedException("Invalid call to output on AliasNode")
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder =
+throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+if (aliasList.length > child.output.length) {
+  failValidation("Aliasing more fields than we actually have")
+} else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+  failValidation("`as` only allow string arguments")
--- End diff --

I think this error message might be confusing because the Scala API does 
not use String arguments. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3701) Cant call execute after first execution

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278197#comment-15278197
 ] 

ASF GitHub Bot commented on FLINK-3701:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1913#discussion_r62683914
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/SerializableCacheableValue.java 
---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * A data structure that enables to keep a value which may be 
serialized/deserialized using a custom
+ * Classloader. The encapsulated value is kept locally for further use 
after a copy of the class has
+ * been serialized and shipped somewhere else.
+ *
+ * This is useful if we want to keep working with a value which may 
require a custom classloader in
+ * one context but is fine with the system/context Classloader in other 
cases. The value is cached
+ * as long as possible to prevent unnecessary 
serialization/deserialization.
+ */
+public class SerializableCacheableValue implements Serializable {
+
+   /** The current cached value. Lost when serialized. */
+   private transient T value;
+
+   private SerializedValue serializedValue;
+
+   public SerializableCacheableValue(T value) {
+   update(value);
+   }
+
+   /**
+* Custom serialization methods which always writes the latest value.
+ */
+   private void writeObject(ObjectOutputStream out) throws IOException {
+   // trigger serialization once more to update to the least 
recent value
+   serialize();
+   out.defaultWriteObject();
+   }
+
+   /**
+* Serialization, e.g. before shipping the class
+*/
+   public void serialize() throws IOException {
+   if (value != null) {
+   serializedValue = new SerializedValue<>(value);
+   }
+   }
+
+   /**
+* Explicit deserialiation using a provided class loader.
+* @param classLoader The class loader to use
+*/
+   public void deserialize(ClassLoader classLoader) {
+   if (serializedValue != null) {
+   try {
+   value = 
serializedValue.deserializeValue(classLoader);
+   } catch (Exception e) {
+   throw new RuntimeException("Attempted to 
deserialize serialized data " +
+   " with class loader " + classLoader + " 
failed. You probably forgot to" +
+   " deserialize using a custom 
Classloader via deserialize(Classloader).", e);
+   }
+   }
+   }
+
+   /**
+* Gets the current value or deserializes it uses the current 
Classloader.
+* @return the value of type T
+ */
+   public T get() {
+   if (value == null) {
+   deserialize(getClass().getClassLoader());
+   }
+   return value;
+   }
+
+   /**
+* Updates the current stored value.
+* @param value The new value of type T
+ */
+   public void update(T value) {
+   Preconditions.checkNotNull(value, "Serializable value must not 
be null.");
+   this.value = value;
+   }
+
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+
+   SerializableCacheableValue that = 
(SerializableCacheableValue) o;
+
+   try {
+   // serialize for equality 

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62683921
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.schema.{Table => CTable}
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.ValidationException
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+val afterResolve = 
super.resolveExpressions(tableEnv).asInstanceOf[Project]
+val newProjectList =
+  afterResolve.projectList.zipWithIndex.map { case (e, i) =>
+e match {
+  case u @ UnresolvedAlias(child, optionalAliasName) => child 
match {
+case ne: NamedExpression => ne
+case e if !e.valid => u
+case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)
+case other => Alias(other, 
optionalAliasName.getOrElse(s"_c$i"))
+  }
+  case _ => throw new IllegalArgumentException
+}
+}
+Project(newProjectList, child)
+  }
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+val allAlias = projectList.forall(_.isInstanceOf[Alias])
+child.toRelNode(relBuilder)
+if (allAlias) {
+  // Calcite's RelBuilder does not translate identity projects even if 
they rename fields.
+  //   Add a projection ourselves (will be automatically removed by 
translation rules).
+  relBuilder.push(
+LogicalProject.create(relBuilder.peek(),
+  projectList.map(_.toRexNode(relBuilder)).asJava,
+  projectList.map(_.name).asJava))
+} else {
+  relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
+}
+  }
+}
+
+case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] =
+throw new UnresolvedException("Invalid call to output on AliasNode")
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder =
+throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+if (aliasList.length > child.output.length) {
+  failValidation("Aliasing more fields than we actually have")
+} else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+  failValidation("`as` only allow string arguments")
+} else {
+  val names = 
aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name)
+  val input = child.output
+  Project(
+names.zip(input).map { case (name, attr) =>
+  Alias(attr, name)} ++ input.drop(names.length), child)
+}
+  }
+}
+
+case class Distinct(child: LogicalNode) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+child.toRelNode(relBuilder)
+

[GitHub] flink pull request: [FLINK-3701] reuse serializer lists in Executi...

2016-05-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1913#discussion_r62683914
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/SerializableCacheableValue.java 
---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * A data structure that enables to keep a value which may be 
serialized/deserialized using a custom
+ * Classloader. The encapsulated value is kept locally for further use 
after a copy of the class has
+ * been serialized and shipped somewhere else.
+ *
+ * This is useful if we want to keep working with a value which may 
require a custom classloader in
+ * one context but is fine with the system/context Classloader in other 
cases. The value is cached
+ * as long as possible to prevent unnecessary 
serialization/deserialization.
+ */
+public class SerializableCacheableValue implements Serializable {
+
+   /** The current cached value. Lost when serialized. */
+   private transient T value;
+
+   private SerializedValue serializedValue;
+
+   public SerializableCacheableValue(T value) {
+   update(value);
+   }
+
+   /**
+* Custom serialization methods which always writes the latest value.
+ */
+   private void writeObject(ObjectOutputStream out) throws IOException {
+   // trigger serialization once more to update to the least 
recent value
+   serialize();
+   out.defaultWriteObject();
+   }
+
+   /**
+* Serialization, e.g. before shipping the class
+*/
+   public void serialize() throws IOException {
+   if (value != null) {
+   serializedValue = new SerializedValue<>(value);
+   }
+   }
+
+   /**
+* Explicit deserialiation using a provided class loader.
+* @param classLoader The class loader to use
+*/
+   public void deserialize(ClassLoader classLoader) {
+   if (serializedValue != null) {
+   try {
+   value = 
serializedValue.deserializeValue(classLoader);
+   } catch (Exception e) {
+   throw new RuntimeException("Attempted to 
deserialize serialized data " +
+   " with class loader " + classLoader + " 
failed. You probably forgot to" +
+   " deserialize using a custom 
Classloader via deserialize(Classloader).", e);
+   }
+   }
+   }
+
+   /**
+* Gets the current value or deserializes it uses the current 
Classloader.
+* @return the value of type T
+ */
+   public T get() {
+   if (value == null) {
+   deserialize(getClass().getClassLoader());
+   }
+   return value;
+   }
+
+   /**
+* Updates the current stored value.
+* @param value The new value of type T
+ */
+   public void update(T value) {
+   Preconditions.checkNotNull(value, "Serializable value must not 
be null.");
+   this.value = value;
+   }
+
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+
+   SerializableCacheableValue that = 
(SerializableCacheableValue) o;
+
+   try {
+   // serialize for equality check
+   serialize();
+   } catch (IOException e) {
+   throw new RuntimeException("Error while serializing for 
equality check.");
+   }
+
+   return 

[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278196#comment-15278196
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62683790
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.schema.{Table => CTable}
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.ValidationException
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+val afterResolve = 
super.resolveExpressions(tableEnv).asInstanceOf[Project]
+val newProjectList =
+  afterResolve.projectList.zipWithIndex.map { case (e, i) =>
+e match {
+  case u @ UnresolvedAlias(child, optionalAliasName) => child 
match {
+case ne: NamedExpression => ne
+case e if !e.valid => u
+case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)
+case other => Alias(other, 
optionalAliasName.getOrElse(s"_c$i"))
+  }
+  case _ => throw new IllegalArgumentException
+}
+}
+Project(newProjectList, child)
+  }
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+val allAlias = projectList.forall(_.isInstanceOf[Alias])
+child.toRelNode(relBuilder)
+if (allAlias) {
+  // Calcite's RelBuilder does not translate identity projects even if 
they rename fields.
+  //   Add a projection ourselves (will be automatically removed by 
translation rules).
+  relBuilder.push(
+LogicalProject.create(relBuilder.peek(),
+  projectList.map(_.toRexNode(relBuilder)).asJava,
+  projectList.map(_.name).asJava))
+} else {
+  relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
+}
+  }
+}
+
+case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] =
+throw new UnresolvedException("Invalid call to output on AliasNode")
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder =
+throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+if (aliasList.length > child.output.length) {
+  failValidation("Aliasing more fields than we actually have")
+} else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+  failValidation("`as` only allow string arguments")
--- End diff --

I think this error message might be confusing because the Scala API does 
not use String arguments. 


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: 

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62685356
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.validate
+
+import scala.reflect.ClassTag
+import scala.util.{Failure, Success, Try}
+
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.validate.FunctionCatalog.FunctionBuilder
+
+/**
+  * A catalog for looking up user defined functions, used by an Analyzer.
+  *
+  * Note: this is adapted from Apache Spark's FunctionRegistry.
+  */
+trait FunctionCatalog {
--- End diff --

Do we need this class except for some of the tests?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278207#comment-15278207
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62685176
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -472,61 +339,39 @@ class Table(
 tableEnv.emitToSink(this, configuredSink)
   }
 
-  private def createRenamingProject(exprs: Seq[RexNode]): LogicalProject = 
{
-
-val names = exprs.map{ e =>
-  e.getKind match {
-case SqlKind.AS =>
-  e.asInstanceOf[RexCall].getOperands.get(1)
-.asInstanceOf[RexLiteral].getValue
-.asInstanceOf[NlsString].getValue
-case SqlKind.INPUT_REF =>
-  
relNode.getRowType.getFieldNames.get(e.asInstanceOf[RexInputRef].getIndex)
-case _ =>
-  throw new PlanGenException("Unexpected expression type 
encountered.")
-  }
-
-}
-LogicalProject.create(relNode, exprs.toList.asJava, 
names.toList.asJava)
-  }
-
   private def checkUniqueNames(exprs: Seq[Expression]): Unit = {
 val names: mutable.Set[String] = mutable.Set()
 
 exprs.foreach {
-  case n: Naming =>
+  case n: Alias =>
 // explicit name
 if (names.contains(n.name)) {
-  throw new IllegalArgumentException(s"Duplicate field name 
$n.name.")
+  throw new ValidationException(s"Duplicate field name $n.name.")
 } else {
   names.add(n.name)
 }
   case u: UnresolvedFieldReference =>
 // simple field forwarding
 if (names.contains(u.name)) {
-  throw new IllegalArgumentException(s"Duplicate field name 
$u.name.")
+  throw new ValidationException(s"Duplicate field name $u.name.")
 } else {
   names.add(u.name)
 }
   case _ => // Do nothing
 }
   }
 
+  @inline protected def validate(logicalNode: => LogicalNode): Table = {
+new Table(tableEnv, logicalNode.validate(tableEnv))
--- End diff --

Would it be OK to remove this method and move the `new Table` call to each 
API method? 
I think this would make the code easier to read.


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62685176
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -472,61 +339,39 @@ class Table(
 tableEnv.emitToSink(this, configuredSink)
   }
 
-  private def createRenamingProject(exprs: Seq[RexNode]): LogicalProject = 
{
-
-val names = exprs.map{ e =>
-  e.getKind match {
-case SqlKind.AS =>
-  e.asInstanceOf[RexCall].getOperands.get(1)
-.asInstanceOf[RexLiteral].getValue
-.asInstanceOf[NlsString].getValue
-case SqlKind.INPUT_REF =>
-  
relNode.getRowType.getFieldNames.get(e.asInstanceOf[RexInputRef].getIndex)
-case _ =>
-  throw new PlanGenException("Unexpected expression type 
encountered.")
-  }
-
-}
-LogicalProject.create(relNode, exprs.toList.asJava, 
names.toList.asJava)
-  }
-
   private def checkUniqueNames(exprs: Seq[Expression]): Unit = {
 val names: mutable.Set[String] = mutable.Set()
 
 exprs.foreach {
-  case n: Naming =>
+  case n: Alias =>
 // explicit name
 if (names.contains(n.name)) {
-  throw new IllegalArgumentException(s"Duplicate field name 
$n.name.")
+  throw new ValidationException(s"Duplicate field name $n.name.")
 } else {
   names.add(n.name)
 }
   case u: UnresolvedFieldReference =>
 // simple field forwarding
 if (names.contains(u.name)) {
-  throw new IllegalArgumentException(s"Duplicate field name 
$u.name.")
+  throw new ValidationException(s"Duplicate field name $u.name.")
 } else {
   names.add(u.name)
 }
   case _ => // Do nothing
 }
   }
 
+  @inline protected def validate(logicalNode: => LogicalNode): Table = {
+new Table(tableEnv, logicalNode.validate(tableEnv))
--- End diff --

Would it be OK to remove this method and move the `new Table` call to each 
API method? 
I think this would make the code easier to read.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >