[jira] [Closed] (FLINK-3970) How to deal with "resouce isolation" problem

2016-05-26 Thread ZhengBowen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZhengBowen closed FLINK-3970.
-
Resolution: Won't Fix

> How to deal with "resouce isolation" problem
> 
>
> Key: FLINK-3970
> URL: https://issues.apache.org/jira/browse/FLINK-3970
> Project: Flink
>  Issue Type: Wish
>Reporter: ZhengBowen
>
> For example, 'big query' and 'small query' are executed at the same time, you 
> need isolate 'big query' and 'small query' to prevent 'big query' exhaust 
> resouce(including i/o,mem,network) to make the 'small query' can complete 
> quickly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-3967 - Flink Sink for Rethink Db

2016-05-26 Thread mans2singh
Github user mans2singh commented on the pull request:

https://github.com/apache/flink/pull/2031#issuecomment-222030024
  
Hi @rmetzger 

Thanks for your advice/suggestions.  I will try to answer your questions 
below:

Regarding motivation for the connector - I started working with Flink late 
last year and found it had some unique features (like streaming framework based 
on events from the grounds up and flexibility of windowing options).  I am 
working through some real-time data flow use cases, where these capabilities 
can stream line our processing pipelines. The integration with RethinkDB came 
into play because from the dev perspective it is schema less, can ingest 
streams/batch of data, has map/reduce functionality.  From the ops/scaling 
perspective it can be scaled/re-sharded in real-time without downtime. It can 
also provide change streams for a table, or a document.  IMHO, Flink and 
Rethink complement each other for scalable, stream processing/analytics use 
cases.  So, I thought it might be good to contribute back to the open source 
community and therefore the PR.

Regarding guidelines for code-contribution - I did go through the document 
and I thought this PR would be in the same vein as the other streaming 
connectors (kafka, etc), and complementing them.  There was no new API, or 
change in interfaces, and the code base is pretty light weight because 
Flink/RethinkDB both make it easy to integrate. However, I did not realize that 
it would be considered to a new feature and required a design review but 
perhaps that was my oversight.

In any case, I really appreciate, the time you and your team took to review 
the PR and advice me.  Please let me know what's your recommendation on how I 
should proceed.

Thanks, Mans



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3967) Provide RethinkDB Sink for Flink

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15303238#comment-15303238
 ] 

ASF GitHub Bot commented on FLINK-3967:
---

Github user mans2singh commented on the pull request:

https://github.com/apache/flink/pull/2031#issuecomment-222030024
  
Hi @rmetzger 

Thanks for your advice/suggestions.  I will try to answer your questions 
below:

Regarding motivation for the connector - I started working with Flink late 
last year and found it had some unique features (like streaming framework based 
on events from the grounds up and flexibility of windowing options).  I am 
working through some real-time data flow use cases, where these capabilities 
can stream line our processing pipelines. The integration with RethinkDB came 
into play because from the dev perspective it is schema less, can ingest 
streams/batch of data, has map/reduce functionality.  From the ops/scaling 
perspective it can be scaled/re-sharded in real-time without downtime. It can 
also provide change streams for a table, or a document.  IMHO, Flink and 
Rethink complement each other for scalable, stream processing/analytics use 
cases.  So, I thought it might be good to contribute back to the open source 
community and therefore the PR.

Regarding guidelines for code-contribution - I did go through the document 
and I thought this PR would be in the same vein as the other streaming 
connectors (kafka, etc), and complementing them.  There was no new API, or 
change in interfaces, and the code base is pretty light weight because 
Flink/RethinkDB both make it easy to integrate. However, I did not realize that 
it would be considered to a new feature and required a design review but 
perhaps that was my oversight.

In any case, I really appreciate, the time you and your team took to review 
the PR and advice me.  Please let me know what's your recommendation on how I 
should proceed.

Thanks, Mans



> Provide RethinkDB Sink for Flink
> 
>
> Key: FLINK-3967
> URL: https://issues.apache.org/jira/browse/FLINK-3967
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming, Streaming Connectors
>Affects Versions: 1.0.3
> Environment: All
>Reporter: Mans Singh
>Assignee: Mans Singh
>Priority: Minor
>  Labels: features
> Fix For: 1.1.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Provide Sink to stream data from flink to rethink db.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3965) Delegating GraphAlgorithm

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302926#comment-15302926
 ] 

ASF GitHub Bot commented on FLINK-3965:
---

Github user greghogan commented on the pull request:

https://github.com/apache/flink/pull/2032#issuecomment-221993882
  
There is more discussion in the ticket description, but for Gelly the idea 
is to keep algorithms small and discrete yet not duplicate computation.

My first take on this merely cached algorithm outputs and if the 
configurations were "equal" would use the prior result.

This second take is able to merge configurations which is much more 
powerful. We can do this because `DataSet`s are lazily evaluated and we can 
replace the old `DataSet` when we want to change how we generate the result. We 
"replace" the `DataSet` by actually wrapping it in a proxy class for which the 
`MethodHandler` always defers to the replaceable `DataSet`.


> Delegating GraphAlgorithm
> -
>
> Key: FLINK-3965
> URL: https://issues.apache.org/jira/browse/FLINK-3965
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Complex and related algorithms often overlap in computation of data. Two such 
> examples are:
> 1) the local and global clustering coefficients each use a listing of 
> triangles
> 2) the local clustering coefficient joins on vertex degree, and the 
> underlying triangle listing annotates edge degree which uses vertex degree
> We can reuse and rewrite algorithm output by creating a {{ProxyObject}} as a 
> delegate for method calls to the {{DataSet}} returned by the algorithm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3965] [gelly] Delegating GraphAlgorithm

2016-05-26 Thread greghogan
Github user greghogan commented on the pull request:

https://github.com/apache/flink/pull/2032#issuecomment-221993882
  
There is more discussion in the ticket description, but for Gelly the idea 
is to keep algorithms small and discrete yet not duplicate computation.

My first take on this merely cached algorithm outputs and if the 
configurations were "equal" would use the prior result.

This second take is able to merge configurations which is much more 
powerful. We can do this because `DataSet`s are lazily evaluated and we can 
replace the old `DataSet` when we want to change how we generate the result. We 
"replace" the `DataSet` by actually wrapping it in a proxy class for which the 
`MethodHandler` always defers to the replaceable `DataSet`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-3975) Override baseurl when serving docs locally

2016-05-26 Thread Dyana Rose (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dyana Rose updated FLINK-3975:
--
Summary: Override baseurl when serving docs locally  (was: docs build 
script isn't serving the preview on the correct base url)

> Override baseurl when serving docs locally
> --
>
> Key: FLINK-3975
> URL: https://issues.apache.org/jira/browse/FLINK-3975
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
> Environment: centos 7.2, jekyll 3.1.6, ruby 2.3.1
>Reporter: Dyana Rose
>Priority: Trivial
>
> when running the documentation build script as:
> {{./build_docs.sh -p}} the docs are built using the correctly overridden 
> baseurl, but they are then served from the wrong url making it a wee bit more 
> difficult than intended to review changes locally.
> The following is the output from running the script:
> {quote}
> Configuration file: _config.yml
> Configuration file: _local_preview_conf.yml
> Source: /vagrant/flink/docs
>Destination: /vagrant/flink/docs/target
>  Incremental build: disabled. Enable with --incremental
>   Generating...
> /home/vagrant/.rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/pygments.rb-0.6.3/lib/pygments/popen.rb:57:
>  warning: Insecure world writable dir /opt/scala-2.11.8/bin in PATH, mode 
> 040777
> done in 16.736 seconds.
>  Auto-regeneration: enabled for '/vagrant/flink/docs'
> Configuration file: /vagrant/flink/docs/_config.yml
> Server address: 
> http://0.0.0.0:4000//ci.apache.org/projects/flink/flink-docs-master/
>   Server running... press ctrl-c to stop.
> {quote}
> As you see it looks to be using both config files to build, but only the 
> default config file to serve.
> This can be fixed by just removing the {{_local_preview_conf.yml}} file and 
> instead specify the baseurl as an option to the serve command, so it becomes 
> {{serve --config _config.yml --baseurl= --watch}}. Giving an output of:
> {quote}
> Configuration file: _config.yml
> Source: /vagrant/flink/docs
>Destination: /vagrant/flink/docs/target
>  Incremental build: disabled. Enable with --incremental
>   Generating...
> /home/vagrant/.rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/pygments.rb-0.6.3/lib/pygments/popen.rb:57:
>  warning: Insecure world writable dir /opt/scala-2.11.8/bin in PATH, mode 
> 040777
> done in 15.928 seconds.
>  Auto-regeneration: enabled for '/vagrant/flink/docs'
> Configuration file: /vagrant/flink/docs/_config.yml
> Server address: http://0.0.0.0:4000/
>   Server running... press ctrl-c to stop.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3975) docs build script isn't serving the preview on the correct base url

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302897#comment-15302897
 ] 

ASF GitHub Bot commented on FLINK-3975:
---

GitHub user dyanarose opened a pull request:

https://github.com/apache/flink/pull/2040

[FLINK-3975] [docs] Override baseurl when serving docs locally

Updating build_docs.sh to serve the docs locally with the correct url.
This change makes build_docs.sh run the equivalent serve command to 
build_docs.bat.

removing now unnecessary config file.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dyanarose/flink FLINK-3975

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2040.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2040


commit 96af1da4e4770f2e6ed18d8413a6ea106927606e
Author: Dyana Rose 
Date:   2016-05-26T20:40:07Z

[FLINK-3975] [docs] Override baseurl when serving docs locally




> docs build script isn't serving the preview on the correct base url
> ---
>
> Key: FLINK-3975
> URL: https://issues.apache.org/jira/browse/FLINK-3975
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
> Environment: centos 7.2, jekyll 3.1.6, ruby 2.3.1
>Reporter: Dyana Rose
>Priority: Trivial
>
> when running the documentation build script as:
> {{./build_docs.sh -p}} the docs are built using the correctly overridden 
> baseurl, but they are then served from the wrong url making it a wee bit more 
> difficult than intended to review changes locally.
> The following is the output from running the script:
> {quote}
> Configuration file: _config.yml
> Configuration file: _local_preview_conf.yml
> Source: /vagrant/flink/docs
>Destination: /vagrant/flink/docs/target
>  Incremental build: disabled. Enable with --incremental
>   Generating...
> /home/vagrant/.rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/pygments.rb-0.6.3/lib/pygments/popen.rb:57:
>  warning: Insecure world writable dir /opt/scala-2.11.8/bin in PATH, mode 
> 040777
> done in 16.736 seconds.
>  Auto-regeneration: enabled for '/vagrant/flink/docs'
> Configuration file: /vagrant/flink/docs/_config.yml
> Server address: 
> http://0.0.0.0:4000//ci.apache.org/projects/flink/flink-docs-master/
>   Server running... press ctrl-c to stop.
> {quote}
> As you see it looks to be using both config files to build, but only the 
> default config file to serve.
> This can be fixed by just removing the {{_local_preview_conf.yml}} file and 
> instead specify the baseurl as an option to the serve command, so it becomes 
> {{serve --config _config.yml --baseurl= --watch}}. Giving an output of:
> {quote}
> Configuration file: _config.yml
> Source: /vagrant/flink/docs
>Destination: /vagrant/flink/docs/target
>  Incremental build: disabled. Enable with --incremental
>   Generating...
> /home/vagrant/.rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/pygments.rb-0.6.3/lib/pygments/popen.rb:57:
>  warning: Insecure world writable dir /opt/scala-2.11.8/bin in PATH, mode 
> 040777
> done in 15.928 seconds.
>  Auto-regeneration: enabled for '/vagrant/flink/docs'
> Configuration file: /vagrant/flink/docs/_config.yml
> Server address: http://0.0.0.0:4000/
>   Server running... press ctrl-c to stop.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3975] [docs] Override baseurl when serv...

2016-05-26 Thread dyanarose
GitHub user dyanarose opened a pull request:

https://github.com/apache/flink/pull/2040

[FLINK-3975] [docs] Override baseurl when serving docs locally

Updating build_docs.sh to serve the docs locally with the correct url.
This change makes build_docs.sh run the equivalent serve command to 
build_docs.bat.

removing now unnecessary config file.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dyanarose/flink FLINK-3975

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2040.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2040


commit 96af1da4e4770f2e6ed18d8413a6ea106927606e
Author: Dyana Rose 
Date:   2016-05-26T20:40:07Z

[FLINK-3975] [docs] Override baseurl when serving docs locally




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-3978) Add hasBroadcastVariable method to RuntimeContext

2016-05-26 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-3978:
--
Fix Version/s: 1.1.0

> Add hasBroadcastVariable method to RuntimeContext
> -
>
> Key: FLINK-3978
> URL: https://issues.apache.org/jira/browse/FLINK-3978
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
> Fix For: 1.1.0
>
>
> The javadocs for {{RuntimeContext}} state that {{getAccumulator}} "throws an 
> exception if the accumulator does not exist or if the accumulator exists, but 
> with different type", although {{AbstractRuntimeUDFContext}} does not throw 
> an exception but will return null.
> The javadocs for {{getBroadcastVariable}} do not mention throwing an 
> exception. Currently the only way to handle a broadcast variable that that 
> may or may not exist is to catch and ignore the exception. Adding a  
> {{containsBroadcastVariable}} method to {{RuntimeContext}} would make this 
> explicit.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3910) New self-join operator

2016-05-26 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302853#comment-15302853
 ] 

Greg Hogan commented on FLINK-3910:
---

I see this case as part of flushing out the remaining join operators. Flink 
could get by with {{map}}, {{reduce}}, and {{join}} but we are kindly given 
additional operators for clarity and performance. Outer joins have been quite 
useful despite that we could instead use {{coGroup}}. anti- and semi-joins 
would be similarly useful but are for now just comments in code.

{{selfJoin}} can have a large impact on performance. A {{reduce}} is {{O(n)}} 
but a join is {{O(n^2)}} so data skew has a much larger effect.

How would extension classes contrast with simply marking methods as 
{{@PublicEvolving}}?

I do see that it may be desirable to defer major features to the next release 
when there is insufficient time to settle.


> New self-join operator
> --
>
> Key: FLINK-3910
> URL: https://issues.apache.org/jira/browse/FLINK-3910
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API, Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> Flink currently provides inner- and outer-joins as well as cogroup and the 
> non-keyed cross. {{JoinOperator}} hints at future support for semi- and 
> anti-joins.
> Many Gelly algorithms perform a self-join [0]. Still pending reviews, 
> FLINK-3768 performs a self-join on non-skewed data in TriangleListing.java 
> and FLINK-3780 performs a self-join on skewed data in JaccardSimilarity.java. 
> A {{SelfJoinHint}} will select between skewed and non-skewed implementations.
> The object-reuse-disabled case can be simply handled with a new {{Operator}}. 
> The object-reuse-enabled case requires either {{CopyableValue}} types (as in 
> the code above) or a custom driver which has access to the serializer (or 
> making the serializer accessible to rich functions, and I think there be 
> dragons).
> If the idea of a self-join is agreeable, I'd like to work out a rough 
> implementation and go from there.
> [0] https://en.wikipedia.org/wiki/Join_%28SQL%29#Self-join



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3910) New self-join operator

2016-05-26 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302852#comment-15302852
 ] 

Greg Hogan commented on FLINK-3910:
---

I see this case as part of flushing out the remaining join operators. Flink 
could get by with `map`, `reduce`, and `join` but we are kindly given 
additional operators for clarity and performance. Outer joins have been quite 
useful despite that we could instead use `coGroup`. anti- and semi-joins would 
be similarly useful but are for now just comments in code.

`selfJoin` can have a large impact on performance. A `reduce` is `O(n)` but a 
join is `O(n^2)` so data skew has a much larger effect.

How would extension classes contrast with simply marking methods as 
`@PublicEvolving`?

I do see that it may be desirable to defer major features to the next release 
when there is insufficient time to settle.

> New self-join operator
> --
>
> Key: FLINK-3910
> URL: https://issues.apache.org/jira/browse/FLINK-3910
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API, Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> Flink currently provides inner- and outer-joins as well as cogroup and the 
> non-keyed cross. {{JoinOperator}} hints at future support for semi- and 
> anti-joins.
> Many Gelly algorithms perform a self-join [0]. Still pending reviews, 
> FLINK-3768 performs a self-join on non-skewed data in TriangleListing.java 
> and FLINK-3780 performs a self-join on skewed data in JaccardSimilarity.java. 
> A {{SelfJoinHint}} will select between skewed and non-skewed implementations.
> The object-reuse-disabled case can be simply handled with a new {{Operator}}. 
> The object-reuse-enabled case requires either {{CopyableValue}} types (as in 
> the code above) or a custom driver which has access to the serializer (or 
> making the serializer accessible to rich functions, and I think there be 
> dragons).
> If the idea of a self-join is agreeable, I'd like to work out a rough 
> implementation and go from there.
> [0] https://en.wikipedia.org/wiki/Join_%28SQL%29#Self-join



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (FLINK-3910) New self-join operator

2016-05-26 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-3910:
--
Comment: was deleted

(was: I see this case as part of flushing out the remaining join operators. 
Flink could get by with `map`, `reduce`, and `join` but we are kindly given 
additional operators for clarity and performance. Outer joins have been quite 
useful despite that we could instead use `coGroup`. anti- and semi-joins would 
be similarly useful but are for now just comments in code.

`selfJoin` can have a large impact on performance. A `reduce` is `O(n)` but a 
join is `O(n^2)` so data skew has a much larger effect.

How would extension classes contrast with simply marking methods as 
`@PublicEvolving`?

I do see that it may be desirable to defer major features to the next release 
when there is insufficient time to settle.)

> New self-join operator
> --
>
> Key: FLINK-3910
> URL: https://issues.apache.org/jira/browse/FLINK-3910
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API, Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> Flink currently provides inner- and outer-joins as well as cogroup and the 
> non-keyed cross. {{JoinOperator}} hints at future support for semi- and 
> anti-joins.
> Many Gelly algorithms perform a self-join [0]. Still pending reviews, 
> FLINK-3768 performs a self-join on non-skewed data in TriangleListing.java 
> and FLINK-3780 performs a self-join on skewed data in JaccardSimilarity.java. 
> A {{SelfJoinHint}} will select between skewed and non-skewed implementations.
> The object-reuse-disabled case can be simply handled with a new {{Operator}}. 
> The object-reuse-enabled case requires either {{CopyableValue}} types (as in 
> the code above) or a custom driver which has access to the serializer (or 
> making the serializer accessible to rich functions, and I think there be 
> dragons).
> If the idea of a self-join is agreeable, I'd like to work out a rough 
> implementation and go from there.
> [0] https://en.wikipedia.org/wiki/Join_%28SQL%29#Self-join



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3978) Add hasBroadcastVariable method to RuntimeContext

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302849#comment-15302849
 ] 

ASF GitHub Bot commented on FLINK-3978:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2039

[FLINK-3978] [core] Add hasBroadcastVariable method to RuntimeContext

New method RuntimeContext.hasBroadcastVariable(String).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
3978_add_hasbroadcastvariable_method_to_runtimecontext

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2039.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2039


commit 39391e07f075d5e1bd2f791da4c610d766968b94
Author: Greg Hogan 
Date:   2016-05-26T18:45:00Z

[FLINK-3978] [core] Add hasBroadcastVariable method to RuntimeContext

New method RuntimeContext.hasBroadcastVariable(String).




> Add hasBroadcastVariable method to RuntimeContext
> -
>
> Key: FLINK-3978
> URL: https://issues.apache.org/jira/browse/FLINK-3978
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> The javadocs for {{RuntimeContext}} state that {{getAccumulator}} "throws an 
> exception if the accumulator does not exist or if the accumulator exists, but 
> with different type", although {{AbstractRuntimeUDFContext}} does not throw 
> an exception but will return null.
> The javadocs for {{getBroadcastVariable}} do not mention throwing an 
> exception. Currently the only way to handle a broadcast variable that that 
> may or may not exist is to catch and ignore the exception. Adding a  
> {{containsBroadcastVariable}} method to {{RuntimeContext}} would make this 
> explicit.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3978] [core] Add hasBroadcastVariable m...

2016-05-26 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2039

[FLINK-3978] [core] Add hasBroadcastVariable method to RuntimeContext

New method RuntimeContext.hasBroadcastVariable(String).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
3978_add_hasbroadcastvariable_method_to_runtimecontext

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2039.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2039


commit 39391e07f075d5e1bd2f791da4c610d766968b94
Author: Greg Hogan 
Date:   2016-05-26T18:45:00Z

[FLINK-3978] [core] Add hasBroadcastVariable method to RuntimeContext

New method RuntimeContext.hasBroadcastVariable(String).




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3980) Remove ExecutionConfig.PARALLELISM_UNKNOWN

2016-05-26 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-3980:
-

 Summary: Remove ExecutionConfig.PARALLELISM_UNKNOWN
 Key: FLINK-3980
 URL: https://issues.apache.org/jira/browse/FLINK-3980
 Project: Flink
  Issue Type: Improvement
  Components: Core
Affects Versions: 1.1.0
Reporter: Greg Hogan
Assignee: Greg Hogan
Priority: Minor
 Fix For: 1.1.0


FLINK-3589 added {{ExecutionConfig.PARALLELISM_DEFAULT}} and 
{{ExecutionConfig.PARALLELISM_UNKNOWN}}. The former gave a name to the contant 
{{-1}}  and the latter was used as a default no-op when setting the parallelism.

It's nice to keep these intents separate but given the current implementation 
of Operator parallelism users can get by using {{PARALLELISM_DEFAULT}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3589) Allow setting Operator parallelism to default value

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302712#comment-15302712
 ] 

ASF GitHub Bot commented on FLINK-3589:
---

Github user greghogan commented on the pull request:

https://github.com/apache/flink/pull/1778#issuecomment-221965805
  
It's good to have you back :)

I think we can get by with only `PARALLELISM_DEFAULT` since 
`java.operators.Operator.setParallelism(int)` is only called by the Scala and 
Python APIs. I'll open a ticket.


> Allow setting Operator parallelism to default value
> ---
>
> Key: FLINK-3589
> URL: https://issues.apache.org/jira/browse/FLINK-3589
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
> Fix For: 1.1.0
>
>
> User's can override the parallelism for a single operator by calling 
> {{Operator.setParallelism}}, which accepts a positive value. {{Operator}} 
> uses {{-1}} to indicate default parallelism. It would be nice to name and 
> accept this default value.
> This would enable user algorithms to allow configurable parallelism while 
> still chaining operator methods.
> For example, currently:
> {code}
>   private int parallelism;
>   ...
>   public void setParallelism(int parallelism) {
>   this.parallelism = parallelism;
>   }
>   ...
>   MapOperator, Edge> newEdges = 
> edges
>   .map(new MyMapFunction())
>   .name("My map function");
>   if (parallelism > 0) {
>   newEdges.setParallelism(parallelism);
>   }
> {code}
> Could be simplified to:
> {code}
>   private int parallelism = Operator.DEFAULT_PARALLELISM;
>   ...
>   public void setParallelism(int parallelism) {
>   this.parallelism = parallelism;
>   }
>   ...
>   DataSet> newEdges = edges
>   .map(new MyMapFunction())
>   .setParallelism(parallelism)
>   .name("My map function");
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3589] Allow setting Operator parallelis...

2016-05-26 Thread greghogan
Github user greghogan commented on the pull request:

https://github.com/apache/flink/pull/1778#issuecomment-221965805
  
It's good to have you back :)

I think we can get by with only `PARALLELISM_DEFAULT` since 
`java.operators.Operator.setParallelism(int)` is only called by the Scala and 
Python APIs. I'll open a ticket.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3969) Log Exceptions Thrown by Invokable

2016-05-26 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302682#comment-15302682
 ] 

Stephan Ewen commented on FLINK-3969:
-

The log statement should probably go in line 612.

https://github.com/apache/flink/blob/release-1.0.2-rc3/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L612

> Log Exceptions Thrown by Invokable
> --
>
> Key: FLINK-3969
> URL: https://issues.apache.org/jira/browse/FLINK-3969
> Project: Flink
>  Issue Type: Improvement
>Reporter: Aljoscha Krettek
>
> Currently, in {{Task.run}} the exceptions thrown by 
> {{AbstractInvokable.invoke()}} are not logged. They only appear in the web 
> front-end or are forwarded to the client.
> [~davidkim] reported this problem on the mailing list.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3589) Allow setting Operator parallelism to default value

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302673#comment-15302673
 ] 

ASF GitHub Bot commented on FLINK-3589:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1778#issuecomment-221960487
  
Couldn't that be equally well checked in the program that calls 
`setParallelism(...)`?

Just asking because this is handled only in very few parts of the DataSet 
API anyways (not at all handled in streaming) and seems a bit of a special case.

You may notice, I am starting to ask this question about "what needs to be 
part of the API, what can be part of the user program" quite a bit lately ;-) 
Trying to be conscious about complexity and commitment to maintaining all the 
API contructs.


> Allow setting Operator parallelism to default value
> ---
>
> Key: FLINK-3589
> URL: https://issues.apache.org/jira/browse/FLINK-3589
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
> Fix For: 1.1.0
>
>
> User's can override the parallelism for a single operator by calling 
> {{Operator.setParallelism}}, which accepts a positive value. {{Operator}} 
> uses {{-1}} to indicate default parallelism. It would be nice to name and 
> accept this default value.
> This would enable user algorithms to allow configurable parallelism while 
> still chaining operator methods.
> For example, currently:
> {code}
>   private int parallelism;
>   ...
>   public void setParallelism(int parallelism) {
>   this.parallelism = parallelism;
>   }
>   ...
>   MapOperator, Edge> newEdges = 
> edges
>   .map(new MyMapFunction())
>   .name("My map function");
>   if (parallelism > 0) {
>   newEdges.setParallelism(parallelism);
>   }
> {code}
> Could be simplified to:
> {code}
>   private int parallelism = Operator.DEFAULT_PARALLELISM;
>   ...
>   public void setParallelism(int parallelism) {
>   this.parallelism = parallelism;
>   }
>   ...
>   DataSet> newEdges = edges
>   .map(new MyMapFunction())
>   .setParallelism(parallelism)
>   .name("My map function");
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3589] Allow setting Operator parallelis...

2016-05-26 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1778#issuecomment-221960487
  
Couldn't that be equally well checked in the program that calls 
`setParallelism(...)`?

Just asking because this is handled only in very few parts of the DataSet 
API anyways (not at all handled in streaming) and seems a bit of a special case.

You may notice, I am starting to ask this question about "what needs to be 
part of the API, what can be part of the user program" quite a bit lately ;-) 
Trying to be conscious about complexity and commitment to maintaining all the 
API contructs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-3978) Add hasBroadcastVariable method to RuntimeContext

2016-05-26 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-3978:
--
Summary: Add hasBroadcastVariable method to RuntimeContext  (was: Add 
containsBroadcastVariable method to RuntimeContext)

> Add hasBroadcastVariable method to RuntimeContext
> -
>
> Key: FLINK-3978
> URL: https://issues.apache.org/jira/browse/FLINK-3978
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> The javadocs for {{RuntimeContext}} state that {{getAccumulator}} "throws an 
> exception if the accumulator does not exist or if the accumulator exists, but 
> with different type", although {{AbstractRuntimeUDFContext}} does not throw 
> an exception but will return null.
> The javadocs for {{getBroadcastVariable}} do not mention throwing an 
> exception. Currently the only way to handle a broadcast variable that that 
> may or may not exist is to catch and ignore the exception. Adding a  
> {{containsBroadcastVariable}} method to {{RuntimeContext}} would make this 
> explicit.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3723) Aggregate Functions and scalar expressions shouldn't be mixed in select

2016-05-26 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302667#comment-15302667
 ] 

Stephan Ewen commented on FLINK-3723:
-

Very tricky. The relationship to SQL is quite a strong argument, for everyone 
coming from SQL but wanting something that embeds better.

If one wants to select after {{agg}}, one probably also writes a lot of 
redundant lines.

Finally, one comment on fields that are neither groupBy, nor aggregated: In 
core SQL, these should not exist, but many dialects have them. It is a bit as 
if one selects some distinct values and some non distinct values.

> Aggregate Functions and scalar expressions shouldn't be mixed in select
> ---
>
> Key: FLINK-3723
> URL: https://issues.apache.org/jira/browse/FLINK-3723
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.1
>Reporter: Yijie Shen
>Priority: Critical
>
> When we type {code}select deptno, name, max(age) from dept group by 
> deptno;{code} in calcite or Oracle, it will complain {code}Expression 'NAME' 
> is not being grouped{code} or {code}Column 'dept.name' is invalid in the 
> select list because it is not contained in either an aggregate function or 
> the GROUP BY clause.{code} because of the nondeterministic result.
> Therefore, I suggest to separate the current functionality of `select` into 
> two api, the new `select` only handle scalar expressions, and an `agg` accept 
> Aggregates.
> {code}
> def select(exprs: Expression*)
> def agg(aggs: Aggregation*)
> 
> tbl.groupBy('deptno)
>.agg('age.max, 'age.min)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3910) New self-join operator

2016-05-26 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302656#comment-15302656
 ] 

Stephan Ewen commented on FLINK-3910:
-

Let me play devil's advocate here: Every time we add new specialized 
constructs, we make our lives harder by committing to maintaining them.

There are two things to think about:
  - How much difference to these three self join variants make? Enough to have 
them in the core? Or enough to have them in Utils or Gelly directly?
  - How deeply do we want to embed them in the DataSet API. Anything added is 
virtually impossible to remove, and a very long term commitment.

Given the plethora of things one could add to the APIs, the cost of 
maintenance, and the benefit of a concise API, I think we need to start 
thinking about a staging process.
We could add constructs not directly to the API, but to something like 
extension classes. The once a construct get used very often (let's track this 
by users that look for that operation, but do not find the extension class) we 
start moving it to the core API.

That would sort of act as a steering process that leads to having the common 
and frequent operations on the core API, and the more specialized ones in 
extension classes.

What do you think?

> New self-join operator
> --
>
> Key: FLINK-3910
> URL: https://issues.apache.org/jira/browse/FLINK-3910
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API, Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> Flink currently provides inner- and outer-joins as well as cogroup and the 
> non-keyed cross. {{JoinOperator}} hints at future support for semi- and 
> anti-joins.
> Many Gelly algorithms perform a self-join [0]. Still pending reviews, 
> FLINK-3768 performs a self-join on non-skewed data in TriangleListing.java 
> and FLINK-3780 performs a self-join on skewed data in JaccardSimilarity.java. 
> A {{SelfJoinHint}} will select between skewed and non-skewed implementations.
> The object-reuse-disabled case can be simply handled with a new {{Operator}}. 
> The object-reuse-enabled case requires either {{CopyableValue}} types (as in 
> the code above) or a custom driver which has access to the serializer (or 
> making the serializer accessible to rich functions, and I think there be 
> dragons).
> If the idea of a self-join is agreeable, I'd like to work out a rough 
> implementation and go from there.
> [0] https://en.wikipedia.org/wiki/Join_%28SQL%29#Self-join



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3758) Add possibility to register accumulators in custom triggers

2016-05-26 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302624#comment-15302624
 ] 

Aljoscha Krettek commented on FLINK-3758:
-

There might be one, I don't know too much about it, unfortunately. Maybe 
[~Zentol] or [~rmetzger] could help us with that.

About whether it will replace the accumulators, I'm not sure. For me the 
accumulators always seemed the right tool for monitoring so maybe they will be 
replaced by something better and then be deprecated at some point. 
[~StephanEwen] and I recently had a short discussion about that, maybe he'd 
also like to chime in on this.

> Add possibility to register accumulators in custom triggers
> ---
>
> Key: FLINK-3758
> URL: https://issues.apache.org/jira/browse/FLINK-3758
> Project: Flink
>  Issue Type: Improvement
>Reporter: Konstantin Knauf
>Assignee: Konstantin Knauf
>Priority: Minor
>
> For monitoring purposes it would be nice to be able to to use accumulators in 
> custom trigger functions. 
> Basically, the trigger context could just expose {{getAccumulator}} of 
> {{RuntimeContext}} or does this create problems I am not aware of?
> Adding accumulators in a trigger function is more difficult, I think, but 
> that's not really neccessary as the accummulator could just be added in some 
> other upstream operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302570#comment-15302570
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1813#issuecomment-221944267
  
@subhankarb @rmetzger It's actually a good question... Should we push this 
further. Otherwise this PR is getting old.
@subhankarb : if you update this PR and want feedback, you need to add a 
comment (best add the requested reviewers via @ ) to make sure to get attention

I just had a look and one comment about logging the exception cause is 
still not addressed. Can you fis this and maybe rebase to current master?


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-05-26 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1813#issuecomment-221944267
  
@subhankarb @rmetzger It's actually a good question... Should we push this 
further. Otherwise this PR is getting old.
@subhankarb : if you update this PR and want feedback, you need to add a 
comment (best add the requested reviewers via @ ) to make sure to get attention

I just had a look and one comment about logging the exception cause is 
still not addressed. Can you fis this and maybe rebase to current master?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3190) Retry rate limits for DataStream API

2016-05-26 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302566#comment-15302566
 ] 

Robert Metzger commented on FLINK-3190:
---

[~packet], since you've requested this feature, would you be available to 
review and test the pull request?

> Retry rate limits for DataStream API
> 
>
> Key: FLINK-3190
> URL: https://issues.apache.org/jira/browse/FLINK-3190
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Michał Fijołek
>Priority: Minor
>
> For a long running stream processing job, absolute numbers of retries don't 
> make much sense: The job will accumulate transient errors over time and will 
> die eventually when thresholds are exceeded. Rate limits are better suited in 
> this scenario: A job should only die, if it fails too often in a given time 
> frame. To better overcome transient errors, retry delays could be used, as 
> suggested in other issues.
> Absolute numbers of retries can still make sense, if failing operators don't 
> make any progress at all. We can measure progress by OperatorState changes 
> and by observing output, as long as the operator in question is not a sink. 
> If operator state changes and/or operator produces output, we can assume it 
> makes progress.
> As an example, let's say we configured a retry rate limit of 10 retries per 
> hour and a non-sink operator A. If the operator fails once every 10 minutes 
> and produces output between failures, it should not lead to job termination. 
> But if the operator fails 11 times in an hour or does not produce output 
> between 11 consecutive failures, job should be terminated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3758) Add possibility to register accumulators in custom triggers

2016-05-26 Thread Konstantin Knauf (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302555#comment-15302555
 ] 

Konstantin Knauf commented on FLINK-3758:
-

[~aljoscha] Yes, in particular my goal is to monitor the number of currently 
buffered events, but in more advanced triggers there are certainly other points 
of interest. 

Is there a design document for these metrics? As far as I understand it, 
metrics will not replace accumulators, right?



> Add possibility to register accumulators in custom triggers
> ---
>
> Key: FLINK-3758
> URL: https://issues.apache.org/jira/browse/FLINK-3758
> Project: Flink
>  Issue Type: Improvement
>Reporter: Konstantin Knauf
>Assignee: Konstantin Knauf
>Priority: Minor
>
> For monitoring purposes it would be nice to be able to to use accumulators in 
> custom trigger functions. 
> Basically, the trigger context could just expose {{getAccumulator}} of 
> {{RuntimeContext}} or does this create problems I am not aware of?
> Adding accumulators in a trigger function is more difficult, I think, but 
> that's not really neccessary as the accummulator could just be added in some 
> other upstream operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3978) Add containsBroadcastVariable method to RuntimeContext

2016-05-26 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302501#comment-15302501
 ] 

Greg Hogan commented on FLINK-3978:
---

That was my first thought, and I wanted to name the ticket "add hassers to 
RuntimeContext", but I deferred to {{Collections}}'s nomenclature when typing 
this out.

> Add containsBroadcastVariable method to RuntimeContext
> --
>
> Key: FLINK-3978
> URL: https://issues.apache.org/jira/browse/FLINK-3978
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> The javadocs for {{RuntimeContext}} state that {{getAccumulator}} "throws an 
> exception if the accumulator does not exist or if the accumulator exists, but 
> with different type", although {{AbstractRuntimeUDFContext}} does not throw 
> an exception but will return null.
> The javadocs for {{getBroadcastVariable}} do not mention throwing an 
> exception. Currently the only way to handle a broadcast variable that that 
> may or may not exist is to catch and ignore the exception. Adding a  
> {{containsBroadcastVariable}} method to {{RuntimeContext}} would make this 
> explicit.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3978) Add containsBroadcastVariable method to RuntimeContext

2016-05-26 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302482#comment-15302482
 ] 

Stephan Ewen commented on FLINK-3978:
-

How about calling it {{hasBroadcastVariable}}`?

> Add containsBroadcastVariable method to RuntimeContext
> --
>
> Key: FLINK-3978
> URL: https://issues.apache.org/jira/browse/FLINK-3978
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> The javadocs for {{RuntimeContext}} state that {{getAccumulator}} "throws an 
> exception if the accumulator does not exist or if the accumulator exists, but 
> with different type", although {{AbstractRuntimeUDFContext}} does not throw 
> an exception but will return null.
> The javadocs for {{getBroadcastVariable}} do not mention throwing an 
> exception. Currently the only way to handle a broadcast variable that that 
> may or may not exist is to catch and ignore the exception. Adding a  
> {{containsBroadcastVariable}} method to {{RuntimeContext}} would make this 
> explicit.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302384#comment-15302384
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r64779643
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
 ---
@@ -72,8 +73,8 @@ public T reduce(T value1, T value2) throws Exception {
for (int position : fields) {
// Save position of compared key
// Get both values - both implement comparable
-   Comparable comparable1 = 
value1.getFieldNotNull(position);
-   Comparable comparable2 = 
value2.getFieldNotNull(position);
+   Comparable comparable1 = 
((Tuple)value1).getFieldNotNull(position);
--- End diff --

@tillrohrmann  - Any chance of a review on the updated push.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-3650 Add maxBy/minBy to Scala DataSet AP...

2016-05-26 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r64779643
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
 ---
@@ -72,8 +73,8 @@ public T reduce(T value1, T value2) throws Exception {
for (int position : fields) {
// Save position of compared key
// Get both values - both implement comparable
-   Comparable comparable1 = 
value1.getFieldNotNull(position);
-   Comparable comparable2 = 
value2.getFieldNotNull(position);
+   Comparable comparable1 = 
((Tuple)value1).getFieldNotNull(position);
--- End diff --

@tillrohrmann  - Any chance of a review on the updated push.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

2016-05-26 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1981#issuecomment-221926857
  
Thanks for looking into how Calcite splits the join conditions. You are 
right, we do not need to be "smarter" than Calcite and approve predicates that 
will be later rejected. So +1 for your suggestion.

Apart from the documentation update I would only extend the error messages 
with the conditions that cause the failure. After that the PR should be good to 
merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

2016-05-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1981#discussion_r64778861
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -298,11 +338,40 @@ case class Join(
 val resolvedJoin = super.validate(tableEnv).asInstanceOf[Join]
 if (!resolvedJoin.condition.forall(_.resultType == BOOLEAN_TYPE_INFO)) 
{
   failValidation(s"filter expression ${resolvedJoin.condition} is not 
a boolean")
-} else if (!ambiguousName.isEmpty) {
+} else if (ambiguousName.nonEmpty) {
   failValidation(s"join relations with ambiguous names: 
${ambiguousName.mkString(", ")}")
 }
+
+resolvedJoin.condition.foreach(testJoinCondition(_))
 resolvedJoin
   }
+
+  private def testJoinCondition(expression: Expression): Unit = {
+def checkIfJoinCondition(exp : BinaryComparison) = exp.children match {
+case (x : JoinFieldReference) :: (y : JoinFieldReference) :: Nil
+  if x.isFromLeftInput != y.isFromLeftInput => Unit
+case _ => failValidation(
+  s"Only join predicates supported. For non-join predicates use 
Table#where.")
+  }
+
+var equiJoinFound = false
+def validateConditions(exp: Expression, isAndBranch: Boolean): Unit = 
exp match {
+  case x: And => x.children.foreach(validateConditions(_, isAndBranch))
+  case x: Or => x.children.foreach(validateConditions(_, isAndBranch = 
false))
+  case x: EqualTo =>
+if (isAndBranch) {
+  equiJoinFound = true
+}
+checkIfJoinCondition(x)
+  case x: BinaryComparison => checkIfJoinCondition(x)
+  case x => failValidation(s"Unsupported condition type: 
${x.getClass.getSimpleName}.")
+}
+
+validateConditions(expression, isAndBranch = true)
+if (!equiJoinFound) {
+  failValidation(s"At least one equi-join required.")
--- End diff --

Add the whole join predicate to the error message?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

2016-05-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1981#discussion_r64778778
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -298,11 +338,40 @@ case class Join(
 val resolvedJoin = super.validate(tableEnv).asInstanceOf[Join]
 if (!resolvedJoin.condition.forall(_.resultType == BOOLEAN_TYPE_INFO)) 
{
   failValidation(s"filter expression ${resolvedJoin.condition} is not 
a boolean")
-} else if (!ambiguousName.isEmpty) {
+} else if (ambiguousName.nonEmpty) {
   failValidation(s"join relations with ambiguous names: 
${ambiguousName.mkString(", ")}")
 }
+
+resolvedJoin.condition.foreach(testJoinCondition(_))
 resolvedJoin
   }
+
+  private def testJoinCondition(expression: Expression): Unit = {
+def checkIfJoinCondition(exp : BinaryComparison) = exp.children match {
+case (x : JoinFieldReference) :: (y : JoinFieldReference) :: Nil
+  if x.isFromLeftInput != y.isFromLeftInput => Unit
+case _ => failValidation(
+  s"Only join predicates supported. For non-join predicates use 
Table#where.")
--- End diff --

Add the condition that causes the failure to the error message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302332#comment-15302332
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user rphillips commented on the pull request:

https://github.com/apache/flink/pull/1813#issuecomment-221921229
  
Very handy. Thanks


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-05-26 Thread rphillips
Github user rphillips commented on the pull request:

https://github.com/apache/flink/pull/1813#issuecomment-221921229
  
Very handy. Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-3978) Add containsBroadcastVariable methods to RuntimeContext

2016-05-26 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-3978:
--
Summary: Add containsBroadcastVariable methods to RuntimeContext  (was: Add 
contains methods to RuntimeContext)

> Add containsBroadcastVariable methods to RuntimeContext
> ---
>
> Key: FLINK-3978
> URL: https://issues.apache.org/jira/browse/FLINK-3978
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> The javadocs for {{RuntimeContext}} state that {{getAccumulator}} "throws an 
> exception if the accumulator does not exist or if the accumulator exists, but 
> with different type", although {{AbstractRuntimeUDFContext}} does not throw 
> an exception but will return null.
> The javadocs for {{getBroadcastVariable}} do not mention throwing an 
> exception. Currently the only way to handle a broadcast variable that that 
> may or may not exist is to catch and ignore the exception. Adding a  
> {{containsBroadcastVariable}} method to {{RuntimeContext}} would make this 
> explicit.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3978) Add containsBroadcastVariable method to RuntimeContext

2016-05-26 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-3978:
--
Summary: Add containsBroadcastVariable method to RuntimeContext  (was: Add 
containsBroadcastVariable methods to RuntimeContext)

> Add containsBroadcastVariable method to RuntimeContext
> --
>
> Key: FLINK-3978
> URL: https://issues.apache.org/jira/browse/FLINK-3978
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> The javadocs for {{RuntimeContext}} state that {{getAccumulator}} "throws an 
> exception if the accumulator does not exist or if the accumulator exists, but 
> with different type", although {{AbstractRuntimeUDFContext}} does not throw 
> an exception but will return null.
> The javadocs for {{getBroadcastVariable}} do not mention throwing an 
> exception. Currently the only way to handle a broadcast variable that that 
> may or may not exist is to catch and ignore the exception. Adding a  
> {{containsBroadcastVariable}} method to {{RuntimeContext}} would make this 
> explicit.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-05-26 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1813#issuecomment-221913785
  
Actually, its quite easy to implement a redis sink yourself.

Check out the `RedisResultSink` in this example: 
https://github.com/dataArtisans/yahoo-streaming-benchmark/blob/b3d35a761bae468affed92ef70d11739ddc9d432/flink-benchmarks/src/main/java/flink/benchmark/AdvertisingTopologyFlinkWindows.java#L319


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302297#comment-15302297
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1813#issuecomment-221913785
  
Actually, its quite easy to implement a redis sink yourself.

Check out the `RedisResultSink` in this example: 
https://github.com/dataArtisans/yahoo-streaming-benchmark/blob/b3d35a761bae468affed92ef70d11739ddc9d432/flink-benchmarks/src/main/java/flink/benchmark/AdvertisingTopologyFlinkWindows.java#L319


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3979) [documentation]add missed import classes in run_example_quickstart

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302267#comment-15302267
 ] 

ASF GitHub Bot commented on FLINK-3979:
---

GitHub user jiazhai opened a pull request:

https://github.com/apache/flink/pull/2038

[FLINK-3979] documentation - add missed import classes in 
run_example_quickstart

The classes that need to be imported  for this part of code
```
result
.map(new MapFunction, String>() {
@Override
public String map(Tuple2 tuple) {
return tuple.toString();
}
})
.addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", 
new SimpleStringSchema()));
```

is 

```
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.api.common.functions.MapFunction;
```


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jiazhai/flink f3979

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2038.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2038


commit 84de7466757c86e968eb9ddce8ba7865821e9091
Author: Zhai Jia 
Date:   2016-05-26T15:24:40Z

[FLINK-3979] add import for code




> [documentation]add missed import classes in run_example_quickstart
> --
>
> Key: FLINK-3979
> URL: https://issues.apache.org/jira/browse/FLINK-3979
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Jia Zhai
>
> The classes that need to be imported  for this part of code
> {code}
> result
> .map(new MapFunction, String>() {
> @Override
> public String map(Tuple2 tuple) {
> return tuple.toString();
> }
> })
> .addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new 
> SimpleStringSchema()));
> {code}
> is 
> {code}
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
> import org.apache.flink.api.common.functions.MapFunction;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302266#comment-15302266
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user rphillips commented on the pull request:

https://github.com/apache/flink/pull/1813#issuecomment-221908345
  
What is the status on this PR? With my use case this PR would be very handy.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-05-26 Thread rphillips
Github user rphillips commented on the pull request:

https://github.com/apache/flink/pull/1813#issuecomment-221908345
  
What is the status on this PR? With my use case this PR would be very handy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3979] documentation - add missed import...

2016-05-26 Thread jiazhai
GitHub user jiazhai opened a pull request:

https://github.com/apache/flink/pull/2038

[FLINK-3979] documentation - add missed import classes in 
run_example_quickstart

The classes that need to be imported  for this part of code
```
result
.map(new MapFunction, String>() {
@Override
public String map(Tuple2 tuple) {
return tuple.toString();
}
})
.addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", 
new SimpleStringSchema()));
```

is 

```
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.api.common.functions.MapFunction;
```


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jiazhai/flink f3979

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2038.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2038


commit 84de7466757c86e968eb9ddce8ba7865821e9091
Author: Zhai Jia 
Date:   2016-05-26T15:24:40Z

[FLINK-3979] add import for code




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3923) Unify configuration conventions of the Kinesis producer to the same as the consumer

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302254#comment-15302254
 ] 

ASF GitHub Bot commented on FLINK-3923:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2016#discussion_r64767022
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -160,12 +168,13 @@ public void 
setCustomPartitioner(KinesisPartitioner partitioner) {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
 
-   KinesisProducerConfiguration config = new 
KinesisProducerConfiguration();
-   config.setRegion(this.region);
-   config.setCredentialsProvider(new StaticCredentialsProvider(new 
BasicAWSCredentials(this.accessKey, this.secretKey)));
+   KinesisProducerConfiguration producerConfig = new 
KinesisProducerConfiguration();
+
+   
producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION));
+   
producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
//config.setCollectionMaxCount(1);
//config.setAggregationMaxCount(1);
--- End diff --

I agree that using the KPL is in general a good idea.
The problem is that the license of the KPL (the Amazon Software License) is 
not compatible with the Apache Software License, therefore, projects at Apache 
can not depend on such code.
We will not release the Flink kinesis connector with the next Flink release 
for that reason (unless we somehow fix this issue).

Are you working at Amazon? If so, can you contact me at robert (at) 
data-artisans.com ?


> Unify configuration conventions of the Kinesis producer to the same as the 
> consumer
> ---
>
> Key: FLINK-3923
> URL: https://issues.apache.org/jira/browse/FLINK-3923
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Abdullah Ozturk
>
> Currently, the Kinesis consumer and producer are configured differently.
> The producer expects a list of arguments for the access key, secret, region, 
> stream. The consumer is accepting properties (similar to the Kafka connector).
> The objective of this issue is to change the producer so that it is also 
> using a properties-based configuration (including an input validation step)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-26 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2016#discussion_r64767022
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -160,12 +168,13 @@ public void 
setCustomPartitioner(KinesisPartitioner partitioner) {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
 
-   KinesisProducerConfiguration config = new 
KinesisProducerConfiguration();
-   config.setRegion(this.region);
-   config.setCredentialsProvider(new StaticCredentialsProvider(new 
BasicAWSCredentials(this.accessKey, this.secretKey)));
+   KinesisProducerConfiguration producerConfig = new 
KinesisProducerConfiguration();
+
+   
producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION));
+   
producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
//config.setCollectionMaxCount(1);
//config.setAggregationMaxCount(1);
--- End diff --

I agree that using the KPL is in general a good idea.
The problem is that the license of the KPL (the Amazon Software License) is 
not compatible with the Apache Software License, therefore, projects at Apache 
can not depend on such code.
We will not release the Flink kinesis connector with the next Flink release 
for that reason (unless we somehow fix this issue).

Are you working at Amazon? If so, can you contact me at robert (at) 
data-artisans.com ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

2016-05-26 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1981#issuecomment-221903295
  
Ah, sorry. No, I didn't see your comment. Will think about that and reply 
later. Thanks! :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-3936) Add MIN / MAX aggregations function for BOOLEAN types

2016-05-26 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-3936.

Resolution: Implemented

Implemented with 5784f395536d8a5e6f7d8bfd28bd9c8c0ed99b18

> Add MIN / MAX aggregations function for BOOLEAN types
> -
>
> Key: FLINK-3936
> URL: https://issues.apache.org/jira/browse/FLINK-3936
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.1.0
>
>
> When executing TPC-H Q4, I observed that Calcite generates a MIN aggregate on 
> Boolean literals to translate a decorrelate subquery in an {{EXIST}} 
> predicate.
> MIN and MAX aggregates on Boolean data types are currently not supported and 
> should be added.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-3696) Some Union tests fail for TableConfigMode.EFFICIENT

2016-05-26 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-3696.

   Resolution: Fixed
 Assignee: Yijie Shen
Fix Version/s: 1.1.0

Fixed with ef5832d8f5867826a60f87e2fcaef912dc2950f6

> Some Union tests fail for TableConfigMode.EFFICIENT
> ---
>
> Key: FLINK-3696
> URL: https://issues.apache.org/jira/browse/FLINK-3696
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Vasia Kalavri
>Assignee: Yijie Shen
> Fix For: 1.1.0
>
>
> e.g. testUnionWithFilter gives the following exception:
> {code}
> org.apache.flink.api.common.InvalidProgramException: Cannot union inputs of 
> different types. Input1=scala.Tuple3(_1: Integer, _2: Long, _3: String), 
> input2=Java Tuple3
>   at 
> org.apache.flink.api.java.operators.UnionOperator.(UnionOperator.java:47)
>   at org.apache.flink.api.java.DataSet.union(DataSet.java:1208)
>   at 
> org.apache.flink.api.table.plan.nodes.dataset.DataSetUnion.translateToPlan(DataSetUnion.scala:81)
>   at 
> org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:95)
>   at 
> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:91)
>   at 
> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:51)
>   at 
> org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:43)
>   at 
> org.apache.flink.api.scala.table.test.UnionITCase.testUnionWithFilter(UnionITCase.scala:77)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>   at org.junit.runners.Suite.runChild(Suite.java:127)
>   at org.junit.runners.Suite.runChild(Suite.java:26)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>   at org.junit.runners.Suite.runChild(Suite.java:127)
>   at org.junit.runners.Suite.runChild(Suite.java:26)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
> 

[jira] [Closed] (FLINK-3941) Add support for UNION (with duplicate elimination)

2016-05-26 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-3941.

   Resolution: Implemented
Fix Version/s: 1.1.0

Implemented with ef5832d8f5867826a60f87e2fcaef912dc2950f6

> Add support for UNION (with duplicate elimination)
> --
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Yijie Shen
>Priority: Minor
> Fix For: 1.1.0
>
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

2016-05-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2025


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3936) Add MIN / MAX aggregations function for BOOLEAN types

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302217#comment-15302217
 ] 

ASF GitHub Bot commented on FLINK-3936:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2035


> Add MIN / MAX aggregations function for BOOLEAN types
> -
>
> Key: FLINK-3936
> URL: https://issues.apache.org/jira/browse/FLINK-3936
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.1.0
>
>
> When executing TPC-H Q4, I observed that Calcite generates a MIN aggregate on 
> Boolean literals to translate a decorrelate subquery in an {{EXIST}} 
> predicate.
> MIN and MAX aggregates on Boolean data types are currently not supported and 
> should be added.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

2016-05-26 Thread dawidwys
Github user dawidwys commented on the pull request:

https://github.com/apache/flink/pull/1981#issuecomment-221901395
  
I am not sure if you've seen my comment(due it was to some previous commit 
version). Let me paste my doubts about CNF one more time:

> I am not sure if the normalization to CNF is necessary. Calcite does not 
do it, but searches just the AND operators subtrees for equi conditions. E.g. 
For condition (l.a = r.b AND l.c =r.d) OR (l.a = r.b AND l.e = r.f) after 
transformation to CNF it will be possible to find equi condition, but Calcite 
is not finding it (see RelOptUtil#splitJoinCondition) and results in failing 
DataSet join.
> 
> My proposition would be:
> 
> traverse whole tree(both OR and AND branches) checking if only 
join-conditions exists
> check if equi-condition exists in AND branch



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302218#comment-15302218
 ] 

ASF GitHub Bot commented on FLINK-3941:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2025


> Add support for UNION (with duplicate elimination)
> --
>
> Key: FLINK-3941
> URL: https://issues.apache.org/jira/browse/FLINK-3941
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Yijie Shen
>Priority: Minor
>
> Currently, only UNION ALL is supported by Table API and SQL.
> UNION (with duplicate elimination) can be supported by applying a 
> {{DataSet.distinct()}} after the union on all fields. This issue includes:
> - Extending {{DataSetUnion}}
> - Relaxing {{DataSetUnionRule}} to translated non-all unions.
> - Extend the Table API with union() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3936] [tableAPI] Add MIN/MAX aggregatio...

2016-05-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2035


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3979) [documentation]add missed import classes in run_example_quickstart

2016-05-26 Thread Jia Zhai (JIRA)
Jia Zhai created FLINK-3979:
---

 Summary: [documentation]add missed import classes in 
run_example_quickstart
 Key: FLINK-3979
 URL: https://issues.apache.org/jira/browse/FLINK-3979
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Jia Zhai


The classes that need to be imported  for this part of code
{code}
result
.map(new MapFunction, String>() {
@Override
public String map(Tuple2 tuple) {
return tuple.toString();
}
})
.addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new 
SimpleStringSchema()));
{code}

is 

{code}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.api.common.functions.MapFunction;
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

2016-05-26 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/1981#discussion_r64763844
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -342,21 +368,29 @@ case class Join(
   }
 
   private def testJoinCondition(expression: Expression): Unit = {
-def checkIfJoinCondition(exp : Expression) = if 
(exp.children.exists(!_.isInstanceOf[JoinFieldReference])) {
-  failValidation(s"Only join predicates supported. For non-join 
predicates use Table#where.")
-}
+def checkIfJoinCondition(exp : BinaryComparison) =
+  if (exp.children match {
+case (x : JoinFieldReference) :: (y : JoinFieldReference) :: Nil  
=>
+  x.belongsToLeft == y.belongsToLeft
+case _ => true
+  }) {
+failValidation(s"Only join predicates supported. For non-join 
predicates use Table#where.")
+  }
 
 var equiJoinFound = false
-def validateConditions(exp: Expression) : Unit = exp match {
-  case x: And => x.children.foreach(validateConditions(_))
+def validateConditions(exp: Expression, isAndBranch: Boolean): Unit = 
exp match {
--- End diff --

You're absolutely right, my mistake, sorry! I fixed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

2016-05-26 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1981#issuecomment-221899756
  
Hi @dawidwys, the equi join check is a bit more involved. I implemented a 
CNF converter based on the algorithm described here: 
http://cs.jhu.edu/~jason/tutorials/convert-to-CNF (see my 
[branch](https://github.com/fhueske/flink/tree/tableOuter) ).
You can try to do implement the algorithm as well. Alternatively, we can 
also use my version.

Another thing we need to fix is the documentation, i.e., extending the 
Table API description and removing the outer join restriction in the SQL 
section in `docs/apis/table.md`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

2016-05-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1981#discussion_r64762144
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -342,21 +368,29 @@ case class Join(
   }
 
   private def testJoinCondition(expression: Expression): Unit = {
-def checkIfJoinCondition(exp : Expression) = if 
(exp.children.exists(!_.isInstanceOf[JoinFieldReference])) {
-  failValidation(s"Only join predicates supported. For non-join 
predicates use Table#where.")
-}
+def checkIfJoinCondition(exp : BinaryComparison) =
+  if (exp.children match {
+case (x : JoinFieldReference) :: (y : JoinFieldReference) :: Nil  
=>
+  x.belongsToLeft == y.belongsToLeft
+case _ => true
+  }) {
+failValidation(s"Only join predicates supported. For non-join 
predicates use Table#where.")
+  }
 
 var equiJoinFound = false
-def validateConditions(exp: Expression) : Unit = exp match {
-  case x: And => x.children.foreach(validateConditions(_))
+def validateConditions(exp: Expression, isAndBranch: Boolean): Unit = 
exp match {
--- End diff --

I'm sorry but this check does not catch all cases. For example, it would 
let `l.a = r.b OR (l.c = r.d && l.e = r.f)` pass.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3717 - Be able to re-start reading from ...

2016-05-26 Thread kl0u
Github user kl0u closed the pull request at:

https://github.com/apache/flink/pull/1895


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3889) Make File Monitoring Function checkpointable.

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302184#comment-15302184
 ] 

ASF GitHub Bot commented on FLINK-3889:
---

Github user kl0u closed the pull request at:

https://github.com/apache/flink/pull/1984


> Make File Monitoring Function checkpointable.
> -
>
> Key: FLINK-3889
> URL: https://issues.apache.org/jira/browse/FLINK-3889
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>
> This is essentially the combination of FLINK-3808 and FLINK-3717.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3717) Add functionality to be a able to restore from specific point in a FileInputFormat

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302185#comment-15302185
 ] 

ASF GitHub Bot commented on FLINK-3717:
---

Github user kl0u closed the pull request at:

https://github.com/apache/flink/pull/1895


> Add functionality to be a able to restore from specific point in a 
> FileInputFormat
> --
>
> Key: FLINK-3717
> URL: https://issues.apache.org/jira/browse/FLINK-3717
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>
> This is the first step in order to make the File Sources fault-tolerant. We 
> have to be able to get the start from a specific point in a file despite any 
> caching performed during reading. This will guarantee that the task that will 
> take over the execution of the failed one will be able to start from the 
> correct point in the file.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

2016-05-26 Thread kl0u
Github user kl0u closed the pull request at:

https://github.com/apache/flink/pull/1984


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3655) Allow comma-separated or multiple directories to be specified for FileInputFormat

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302164#comment-15302164
 ] 

ASF GitHub Bot commented on FLINK-3655:
---

Github user gna-phetsarath commented on the pull request:

https://github.com/apache/flink/pull/1990#issuecomment-221890183
  
You are correct, the majority of the changes were in the "generate splits" 
method and "statistics" methods which included changes to subclasses that used 
the file path directly.  Not as extensive as it appears.

Also, additional tests were added.


> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat
> -
>
> Key: FLINK-3655
> URL: https://issues.apache.org/jira/browse/FLINK-3655
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Gna Phetsarath
>Priority: Minor
>  Labels: starter
>
> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat so that a DataSource will process the directories 
> sequentially.
>
> env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")
> in Scala
>env.readFile(paths: Seq[String])
> or 
>   env.readFile(path: String, otherPaths: String*)
> Wildcard support would be a bonus.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3923) Unify configuration conventions of the Kinesis producer to the same as the consumer

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302170#comment-15302170
 ] 

ASF GitHub Bot commented on FLINK-3923:
---

Github user aozturk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2016#discussion_r64757778
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -160,12 +168,13 @@ public void 
setCustomPartitioner(KinesisPartitioner partitioner) {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
 
-   KinesisProducerConfiguration config = new 
KinesisProducerConfiguration();
-   config.setRegion(this.region);
-   config.setCredentialsProvider(new StaticCredentialsProvider(new 
BasicAWSCredentials(this.accessKey, this.secretKey)));
+   KinesisProducerConfiguration producerConfig = new 
KinesisProducerConfiguration();
+
+   
producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION));
+   
producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
//config.setCollectionMaxCount(1);
//config.setAggregationMaxCount(1);
--- End diff --

Ah sorry I have returned back to the code to be sure and it seems KPL is 
already used.


> Unify configuration conventions of the Kinesis producer to the same as the 
> consumer
> ---
>
> Key: FLINK-3923
> URL: https://issues.apache.org/jira/browse/FLINK-3923
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Abdullah Ozturk
>
> Currently, the Kinesis consumer and producer are configured differently.
> The producer expects a list of arguments for the access key, secret, region, 
> stream. The consumer is accepting properties (similar to the Kafka connector).
> The objective of this issue is to change the producer so that it is also 
> using a properties-based configuration (including an input validation step)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-26 Thread aozturk
Github user aozturk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2016#discussion_r64757778
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -160,12 +168,13 @@ public void 
setCustomPartitioner(KinesisPartitioner partitioner) {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
 
-   KinesisProducerConfiguration config = new 
KinesisProducerConfiguration();
-   config.setRegion(this.region);
-   config.setCredentialsProvider(new StaticCredentialsProvider(new 
BasicAWSCredentials(this.accessKey, this.secretKey)));
+   KinesisProducerConfiguration producerConfig = new 
KinesisProducerConfiguration();
+
+   
producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION));
+   
producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
//config.setCollectionMaxCount(1);
//config.setAggregationMaxCount(1);
--- End diff --

Ah sorry I have returned back to the code to be sure and it seems KPL is 
already used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3655] Multiple File Paths for InputFile...

2016-05-26 Thread gna-phetsarath
Github user gna-phetsarath commented on the pull request:

https://github.com/apache/flink/pull/1990#issuecomment-221890183
  
You are correct, the majority of the changes were in the "generate splits" 
method and "statistics" methods which included changes to subclasses that used 
the file path directly.  Not as extensive as it appears.

Also, additional tests were added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3923) Unify configuration conventions of the Kinesis producer to the same as the consumer

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302154#comment-15302154
 ] 

ASF GitHub Bot commented on FLINK-3923:
---

Github user aozturk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2016#discussion_r64756634
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -160,12 +168,13 @@ public void 
setCustomPartitioner(KinesisPartitioner partitioner) {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
 
-   KinesisProducerConfiguration config = new 
KinesisProducerConfiguration();
-   config.setRegion(this.region);
-   config.setCredentialsProvider(new StaticCredentialsProvider(new 
BasicAWSCredentials(this.accessKey, this.secretKey)));
+   KinesisProducerConfiguration producerConfig = new 
KinesisProducerConfiguration();
+
+   
producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION));
+   
producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
//config.setCollectionMaxCount(1);
//config.setAggregationMaxCount(1);
--- End diff --

Sorry again for my late response. Meanwhile I have thought about this and I 
have come up with a question. I had noticed a remark about not using the KCL 
library, but is there any special reason why we are not using KPL? Anything we 
intend to do will probably replicate it including retry mechanism on failures, 
multi-threading and asynchronous writes.

We also need to take into consideration that our producer solution should 
be cost effective by using the opportunities for batching, which can be 
customized by the user if low latency is preferred.


> Unify configuration conventions of the Kinesis producer to the same as the 
> consumer
> ---
>
> Key: FLINK-3923
> URL: https://issues.apache.org/jira/browse/FLINK-3923
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Abdullah Ozturk
>
> Currently, the Kinesis consumer and producer are configured differently.
> The producer expects a list of arguments for the access key, secret, region, 
> stream. The consumer is accepting properties (similar to the Kafka connector).
> The objective of this issue is to change the producer so that it is also 
> using a properties-based configuration (including an input validation step)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-26 Thread aozturk
Github user aozturk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2016#discussion_r64756634
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -160,12 +168,13 @@ public void 
setCustomPartitioner(KinesisPartitioner partitioner) {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
 
-   KinesisProducerConfiguration config = new 
KinesisProducerConfiguration();
-   config.setRegion(this.region);
-   config.setCredentialsProvider(new StaticCredentialsProvider(new 
BasicAWSCredentials(this.accessKey, this.secretKey)));
+   KinesisProducerConfiguration producerConfig = new 
KinesisProducerConfiguration();
+
+   
producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION));
+   
producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
//config.setCollectionMaxCount(1);
//config.setAggregationMaxCount(1);
--- End diff --

Sorry again for my late response. Meanwhile I have thought about this and I 
have come up with a question. I had noticed a remark about not using the KCL 
library, but is there any special reason why we are not using KPL? Anything we 
intend to do will probably replicate it including retry mechanism on failures, 
multi-threading and asynchronous writes.

We also need to take into consideration that our producer solution should 
be cost effective by using the opportunities for batching, which can be 
customized by the user if low latency is preferred.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-05-26 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302127#comment-15302127
 ] 

Stephan Ewen commented on FLINK-3974:
-

Yes, that is a pretty clear bug.

I guess the best workaround for now is to disable the object reuse mode. Object 
reuse does not really work well in the DataStream API streaming right now, it 
works pretty well in the DataSet API.

Another quick workaround is to not chain the two different map functions 
{{.disableChaining()}}.

The solution should be quite straightforward, though:
  - Not chain and "splitting" flows any more. I would actually like that 
solution. For splitting flows, it seems like a good heuristic to start a new 
chain/thread by default.
  - Each collector should use its own dedicated stream record. That would 
circumvent the ClassCast at least, but still be dangerous if the mappers 
actually alter the events.

> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3967) Provide RethinkDB Sink for Flink

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302143#comment-15302143
 ] 

ASF GitHub Bot commented on FLINK-3967:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/2031#issuecomment-221886697
  
Hi @mans2singh,
thanks a lot for this contribution. I wonder what's your motivation to 
implement a flink --> rethinkdb connector? I'm asking because the Flink 
community can not accept every contribution, in particular if its adding a 
large new component we need to maintain.
Only if there are enough users and developers willing support a feature, we 
can accept the contribution.
I'm a bit sorry to tell you this after you've implemented the code, on the 
other hand, our guidelines [1] clearly state that new features need to be 
discussed with the community first.

The good news is that we have a special section for community maintained 
modules on our website [2]. Therefore, I suggest that you put the code into a 
separate github repository and open a pull request to add a link to that 
repository.
The good thing about this approach is also, that we don't have to fix the 
license issue immediately.
Also, it doesn't mean that I'm against this contribution at all. If we see 
many users using the module from your github repository, and there are people 
in the community willing to maintain it, we can also look into merging it.

[1] http://flink.apache.org/contribute-code.html#before-you-start-coding
[2] http://flink.apache.org/community.html#third-party-packages



> Provide RethinkDB Sink for Flink
> 
>
> Key: FLINK-3967
> URL: https://issues.apache.org/jira/browse/FLINK-3967
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming, Streaming Connectors
>Affects Versions: 1.0.3
> Environment: All
>Reporter: Mans Singh
>Assignee: Mans Singh
>Priority: Minor
>  Labels: features
> Fix For: 1.1.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Provide Sink to stream data from flink to rethink db.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-3967 - Flink Sink for Rethink Db

2016-05-26 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/2031#issuecomment-221886697
  
Hi @mans2singh,
thanks a lot for this contribution. I wonder what's your motivation to 
implement a flink --> rethinkdb connector? I'm asking because the Flink 
community can not accept every contribution, in particular if its adding a 
large new component we need to maintain.
Only if there are enough users and developers willing support a feature, we 
can accept the contribution.
I'm a bit sorry to tell you this after you've implemented the code, on the 
other hand, our guidelines [1] clearly state that new features need to be 
discussed with the community first.

The good news is that we have a special section for community maintained 
modules on our website [2]. Therefore, I suggest that you put the code into a 
separate github repository and open a pull request to add a link to that 
repository.
The good thing about this approach is also, that we don't have to fix the 
license issue immediately.
Also, it doesn't mean that I'm against this contribution at all. If we see 
many users using the module from your github repository, and there are people 
in the community willing to maintain it, we can also look into merging it.

[1] http://flink.apache.org/contribute-code.html#before-you-start-coding
[2] http://flink.apache.org/community.html#third-party-packages



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3923) Unify configuration conventions of the Kinesis producer to the same as the consumer

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302141#comment-15302141
 ] 

ASF GitHub Bot commented on FLINK-3923:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/2016#issuecomment-221885858
  
Sure, no problem at all. Its just for my own time allocation. Thanks for 
the quick response.


> Unify configuration conventions of the Kinesis producer to the same as the 
> consumer
> ---
>
> Key: FLINK-3923
> URL: https://issues.apache.org/jira/browse/FLINK-3923
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Abdullah Ozturk
>
> Currently, the Kinesis consumer and producer are configured differently.
> The producer expects a list of arguments for the access key, secret, region, 
> stream. The consumer is accepting properties (similar to the Kafka connector).
> The objective of this issue is to change the producer so that it is also 
> using a properties-based configuration (including an input validation step)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-26 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/2016#issuecomment-221885858
  
Sure, no problem at all. Its just for my own time allocation. Thanks for 
the quick response.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-3978) Add contains methods to RuntimeContext

2016-05-26 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-3978:
--
Description: 
The javadocs for `RuntimeContext` state that `getAccumulator` "throws an 
exception if the accumulator does not exist or if the accumulator exists, but 
with different type", although `AbstractRuntimeUDFContext` does not throw an 
exception but will return null.

The javadocs for `getBroadcastVariable` do not mention throwing an exception. 
Currently the only way to handle a broadcast variable that that may or may not 
exist is to catch and ignore the exception. Adding a  
`containsBroadcastVariable` method to `RuntimeContext` would make this explicit.

  was:
The javadocs for `RuntimeContext` state that `getAccumulator` "throws an 
exception if the accumulator does not exist or if the accumulator exists, but 
with different type", although `AbstractRuntimeUDFContext` does not throw an 
exception but will return null.

The javadocs for `getBroadcastVariable` do not mention throwing an exception. 
Currently the only way to handle a broadcast variable that that may or may not 
exist is to catch and ignore the exception. Adding a  
`containsBroadcastVariable` method to `RuntimeContext` would make this 
explicit. Likewise, `containsAccumulator`.


> Add contains methods to RuntimeContext
> --
>
> Key: FLINK-3978
> URL: https://issues.apache.org/jira/browse/FLINK-3978
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> The javadocs for `RuntimeContext` state that `getAccumulator` "throws an 
> exception if the accumulator does not exist or if the accumulator exists, but 
> with different type", although `AbstractRuntimeUDFContext` does not throw an 
> exception but will return null.
> The javadocs for `getBroadcastVariable` do not mention throwing an exception. 
> Currently the only way to handle a broadcast variable that that may or may 
> not exist is to catch and ignore the exception. Adding a  
> `containsBroadcastVariable` method to `RuntimeContext` would make this 
> explicit.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3923) Unify configuration conventions of the Kinesis producer to the same as the consumer

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302139#comment-15302139
 ] 

ASF GitHub Bot commented on FLINK-3923:
---

Github user aozturk commented on the pull request:

https://github.com/apache/flink/pull/2016#issuecomment-221885430
  
Sorry I have had an extra duty for this week, but I have already started 
working on it and hope to wrap this evening.


> Unify configuration conventions of the Kinesis producer to the same as the 
> consumer
> ---
>
> Key: FLINK-3923
> URL: https://issues.apache.org/jira/browse/FLINK-3923
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Abdullah Ozturk
>
> Currently, the Kinesis consumer and producer are configured differently.
> The producer expects a list of arguments for the access key, secret, region, 
> stream. The consumer is accepting properties (similar to the Kafka connector).
> The objective of this issue is to change the producer so that it is also 
> using a properties-based configuration (including an input validation step)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-26 Thread aozturk
Github user aozturk commented on the pull request:

https://github.com/apache/flink/pull/2016#issuecomment-221885430
  
Sorry I have had an extra duty for this week, but I have already started 
working on it and hope to wrap this evening.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-3978) Add contains methods to RuntimeContext

2016-05-26 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-3978:
--
Description: 
The javadocs for {{RuntimeContext}} state that {{getAccumulator}} "throws an 
exception if the accumulator does not exist or if the accumulator exists, but 
with different type", although {{AbstractRuntimeUDFContext}} does not throw an 
exception but will return null.

The javadocs for {{getBroadcastVariable}} do not mention throwing an exception. 
Currently the only way to handle a broadcast variable that that may or may not 
exist is to catch and ignore the exception. Adding a  
{{containsBroadcastVariable}} method to {{RuntimeContext}} would make this 
explicit.

  was:
The javadocs for `RuntimeContext` state that `getAccumulator` "throws an 
exception if the accumulator does not exist or if the accumulator exists, but 
with different type", although `AbstractRuntimeUDFContext` does not throw an 
exception but will return null.

The javadocs for `getBroadcastVariable` do not mention throwing an exception. 
Currently the only way to handle a broadcast variable that that may or may not 
exist is to catch and ignore the exception. Adding a  
`containsBroadcastVariable` method to `RuntimeContext` would make this explicit.


> Add contains methods to RuntimeContext
> --
>
> Key: FLINK-3978
> URL: https://issues.apache.org/jira/browse/FLINK-3978
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> The javadocs for {{RuntimeContext}} state that {{getAccumulator}} "throws an 
> exception if the accumulator does not exist or if the accumulator exists, but 
> with different type", although {{AbstractRuntimeUDFContext}} does not throw 
> an exception but will return null.
> The javadocs for {{getBroadcastVariable}} do not mention throwing an 
> exception. Currently the only way to handle a broadcast variable that that 
> may or may not exist is to catch and ignore the exception. Adding a  
> {{containsBroadcastVariable}} method to {{RuntimeContext}} would make this 
> explicit.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3978) Add contains methods to RuntimeContext

2016-05-26 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302133#comment-15302133
 ] 

Greg Hogan commented on FLINK-3978:
---

Thanks! I removed that bit from the description. Only looking at 
{{containsBroadcastVariable}} now.

> Add contains methods to RuntimeContext
> --
>
> Key: FLINK-3978
> URL: https://issues.apache.org/jira/browse/FLINK-3978
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> The javadocs for `RuntimeContext` state that `getAccumulator` "throws an 
> exception if the accumulator does not exist or if the accumulator exists, but 
> with different type", although `AbstractRuntimeUDFContext` does not throw an 
> exception but will return null.
> The javadocs for `getBroadcastVariable` do not mention throwing an exception. 
> Currently the only way to handle a broadcast variable that that may or may 
> not exist is to catch and ignore the exception. Adding a  
> `containsBroadcastVariable` method to `RuntimeContext` would make this 
> explicit.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3923) Unify configuration conventions of the Kinesis producer to the same as the consumer

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302123#comment-15302123
 ] 

ASF GitHub Bot commented on FLINK-3923:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/2016#issuecomment-221882616
  
Just for my planning, when do you think you'll find time to address the 
remaining issues?


> Unify configuration conventions of the Kinesis producer to the same as the 
> consumer
> ---
>
> Key: FLINK-3923
> URL: https://issues.apache.org/jira/browse/FLINK-3923
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Abdullah Ozturk
>
> Currently, the Kinesis consumer and producer are configured differently.
> The producer expects a list of arguments for the access key, secret, region, 
> stream. The consumer is accepting properties (similar to the Kafka connector).
> The objective of this issue is to change the producer so that it is also 
> using a properties-based configuration (including an input validation step)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-26 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/2016#issuecomment-221882616
  
Just for my planning, when do you think you'll find time to address the 
remaining issues?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3970) How to deal with "resouce isolation" problem

2016-05-26 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302120#comment-15302120
 ] 

Stephan Ewen commented on FLINK-3970:
-

Since this is a question, rather than a pending issue, are you okay with 
closing this issue?

> How to deal with "resouce isolation" problem
> 
>
> Key: FLINK-3970
> URL: https://issues.apache.org/jira/browse/FLINK-3970
> Project: Flink
>  Issue Type: Wish
>Reporter: ZhengBowen
>
> For example, 'big query' and 'small query' are executed at the same time, you 
> need isolate 'big query' and 'small query' to prevent 'big query' exhaust 
> resouce(including i/o,mem,network) to make the 'small query' can complete 
> quickly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2765) Upgrade hbase version for hadoop-2 to 1.2 release

2016-05-26 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302121#comment-15302121
 ] 

Ted Yu commented on FLINK-2765:
---

API compatibility is maintained.

However, HBASE-15198 caused regression which is being worked on.

Stay tuned for the next release with fix.

> Upgrade hbase version for hadoop-2 to 1.2 release
> -
>
> Key: FLINK-2765
> URL: https://issues.apache.org/jira/browse/FLINK-2765
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Priority: Minor
>
> Currently 0.98.11 is used:
> {code}
> 0.98.11-hadoop2
> {code}
> Stable release for hadoop-2 is 1.1.x line
> We should upgrade to 1.2



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2765) Upgrade hbase version for hadoop-2 to 1.2 release

2016-05-26 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302112#comment-15302112
 ] 

Stephan Ewen commented on FLINK-2765:
-

Did HBase break any APIs, or is the upgrade trivially possible?

> Upgrade hbase version for hadoop-2 to 1.2 release
> -
>
> Key: FLINK-2765
> URL: https://issues.apache.org/jira/browse/FLINK-2765
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Priority: Minor
>
> Currently 0.98.11 is used:
> {code}
> 0.98.11-hadoop2
> {code}
> Stable release for hadoop-2 is 1.1.x line
> We should upgrade to 1.2



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3908) FieldParsers error state is not reset correctly to NONE

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302107#comment-15302107
 ] 

ASF GitHub Bot commented on FLINK-3908:
---

Github user fpompermaier commented on the pull request:

https://github.com/apache/flink/pull/2007#issuecomment-221880071
  
I don't understand :(
Assuming that `reset` could be renamed as `resetErrorStateAndParse`, the 
other 2 suggestions cannot be applied to my current implementation: if I want 
to rename `parseFieldImpl` to `parseField` I should overload somehow the method 
otherwise that's not possible because there's already an abstract method called 
parseField with the same sign..
Wrt leaving the responsibility to reset the state of the parsers to the 
classes calling the parseField is quite dangerous IMHo (as you said in 
"GenericCsvInputCormat would call the resetErrorStateAndParse"). 

Am I misunderstanding something?


> FieldParsers error state is not reset correctly to NONE
> ---
>
> Key: FLINK-3908
> URL: https://issues.apache.org/jira/browse/FLINK-3908
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.0.2
>Reporter: Flavio Pompermaier
>Assignee: Flavio Pompermaier
>  Labels: parser
>
> If during the parse of a csv there's a parse error (for example when in a 
> integer column there are non-int values) the errorState is not reset 
> correctly in the next parseField call. A simple fix would be to add as a 
> first statement of the {{parseField()}} function a call to 
> {{setErrorState(ParseErrorState.NONE)}} but it is something that should be 
> handled better (by default) for every subclass of {{FieldParser}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3908] Fixed Parser's error state reset

2016-05-26 Thread fpompermaier
Github user fpompermaier commented on the pull request:

https://github.com/apache/flink/pull/2007#issuecomment-221880071
  
I don't understand :(
Assuming that `reset` could be renamed as `resetErrorStateAndParse`, the 
other 2 suggestions cannot be applied to my current implementation: if I want 
to rename `parseFieldImpl` to `parseField` I should overload somehow the method 
otherwise that's not possible because there's already an abstract method called 
parseField with the same sign..
Wrt leaving the responsibility to reset the state of the parsers to the 
classes calling the parseField is quite dangerous IMHo (as you said in 
"GenericCsvInputCormat would call the resetErrorStateAndParse"). 

Am I misunderstanding something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3978) Add contains methods to RuntimeContext

2016-05-26 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302101#comment-15302101
 ] 

Aljoscha Krettek commented on FLINK-3978:
-

For the accumulators, I think the way to go is what is proposed as part of 
this: https://issues.apache.org/jira/browse/FLINK-3758, i.e. change the 
accumulator interface to allow getting an accumulator or adding a new one in 
one method call.

> Add contains methods to RuntimeContext
> --
>
> Key: FLINK-3978
> URL: https://issues.apache.org/jira/browse/FLINK-3978
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> The javadocs for `RuntimeContext` state that `getAccumulator` "throws an 
> exception if the accumulator does not exist or if the accumulator exists, but 
> with different type", although `AbstractRuntimeUDFContext` does not throw an 
> exception but will return null.
> The javadocs for `getBroadcastVariable` do not mention throwing an exception. 
> Currently the only way to handle a broadcast variable that that may or may 
> not exist is to catch and ignore the exception. Adding a  
> `containsBroadcastVariable` method to `RuntimeContext` would make this 
> explicit. Likewise, `containsAccumulator`.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3806) Revert use of DataSet.count() in Gelly

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302095#comment-15302095
 ] 

ASF GitHub Bot commented on FLINK-3806:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2036#discussion_r64749372
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
 ---
@@ -289,6 +300,11 @@ private GatherUdf(GatherFunction 
gatherFunction, TypeInformation numberOfVertices = 
getRuntimeContext().getBroadcastVariable("number of vertices");
+   
this.gatherFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue());
+   } catch (Exception e) {
+   }
--- End diff --

I think that makes sense.


> Revert use of DataSet.count() in Gelly
> --
>
> Key: FLINK-3806
> URL: https://issues.apache.org/jira/browse/FLINK-3806
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Critical
> Fix For: 1.1.0
>
>
> FLINK-1632 replaced {{GraphUtils.count}} with {{DataSetUtils.count}}. The 
> former returns a {{DataSet}} while the latter executes a job to return a Java 
> value.
> {{DataSetUtils.count}} is called from {{Graph.numberOfVertices}} and 
> {{Graph.numberOfEdges}} which are called from {{GatherSumApplyIteration}} and 
> {{ScatterGatherIteration}} as well as the {{PageRank}} algorithms when the 
> user does not pass the number of vertices as a parameter.
> As noted in FLINK-1632, this does make the code simpler but if my 
> understanding is correct will materialize the Graph twice. The Graph will 
> need to be reread from input, regenerated, or recomputed by preceding 
> algorithms.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3806] [gelly] Revert use of DataSet.cou...

2016-05-26 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2036#discussion_r64749372
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
 ---
@@ -289,6 +300,11 @@ private GatherUdf(GatherFunction 
gatherFunction, TypeInformation numberOfVertices = 
getRuntimeContext().getBroadcastVariable("number of vertices");
+   
this.gatherFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue());
+   } catch (Exception e) {
+   }
--- End diff --

I think that makes sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3908) FieldParsers error state is not reset correctly to NONE

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302081#comment-15302081
 ] 

ASF GitHub Bot commented on FLINK-3908:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/2007#issuecomment-221876052
  
I think I meant it a bit differently than I guess you understood it.

It is basically the exact same implementation you have now, only with 
different method names.


> FieldParsers error state is not reset correctly to NONE
> ---
>
> Key: FLINK-3908
> URL: https://issues.apache.org/jira/browse/FLINK-3908
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.0.2
>Reporter: Flavio Pompermaier
>Assignee: Flavio Pompermaier
>  Labels: parser
>
> If during the parse of a csv there's a parse error (for example when in a 
> integer column there are non-int values) the errorState is not reset 
> correctly in the next parseField call. A simple fix would be to add as a 
> first statement of the {{parseField()}} function a call to 
> {{setErrorState(ParseErrorState.NONE)}} but it is something that should be 
> handled better (by default) for every subclass of {{FieldParser}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3978) Add contains methods to RuntimeContext

2016-05-26 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-3978:
-

 Summary: Add contains methods to RuntimeContext
 Key: FLINK-3978
 URL: https://issues.apache.org/jira/browse/FLINK-3978
 Project: Flink
  Issue Type: Improvement
  Components: Core
Affects Versions: 1.1.0
Reporter: Greg Hogan
Assignee: Greg Hogan
Priority: Minor


The javadocs for `RuntimeContext` state that `getAccumulator` "throws an 
exception if the accumulator does not exist or if the accumulator exists, but 
with different type", although `AbstractRuntimeUDFContext` does not throw an 
exception but will return null.

The javadocs for `getBroadcastVariable` do not mention throwing an exception. 
Currently the only way to handle a broadcast variable that that may or may not 
exist is to catch and ignore the exception. Adding a  
`containsBroadcastVariable` method to `RuntimeContext` would make this 
explicit. Likewise, `containsAccumulator`.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3908] Fixed Parser's error state reset

2016-05-26 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/2007#issuecomment-221876052
  
I think I meant it a bit differently than I guess you understood it.

It is basically the exact same implementation you have now, only with 
different method names.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

2016-05-26 Thread dawidwys
Github user dawidwys commented on the pull request:

https://github.com/apache/flink/pull/1981#issuecomment-221875998
  
I applied your changes @fhueske . If you have any comments regarding the 
equi-join validation I will address/apply them in the evening.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3868) Specialized CopyableValue serializers and comparators

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302075#comment-15302075
 ] 

ASF GitHub Bot commented on FLINK-3868:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1983#issuecomment-221873359
  
Thanks, Greg, this looks good all in all!

A few thinks we should do before merging this, in my opinion:
  - I think all classes should be annotated with `@Internal`, because they 
should not be used directly by users.

  - The `NullValueComparator` can be simplified to not really operate on 
the values at all. All NullValues are always the same. They are all equal, hash 
to a constant value, etc.

  - We were trying to get rid of the `Record` type - it was part of a very 
old legacy API. It is still in there because some people ended up using it, but 
I would like to add as little dependency to it as possible. Hence I'd suggest 
to drop the `RecordSerializer`.


> Specialized CopyableValue serializers and comparators
> -
>
> Key: FLINK-3868
> URL: https://issues.apache.org/jira/browse/FLINK-3868
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> This need was discussed on the mailing list [1] and will be obviated by code 
> generation for POJO serializers and comparators [2] (as I understand, i.e., 
> {{LongValue}} will now simply be treated as a POJO which happens to contain a 
> {{long}} and a specialized serializer and comparator will be generated).
> In the meantime, and since {{ValueTypeInfo}} will need to be reworked to use 
> the new generators, I think it is worthwhile to add specialized serializers 
> and comparators for the {{CopyableValue}} types.
> This will also provide another point of comparison for the performance of the 
> generated serializers and comparators.
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] https://issues.apache.org/jira/browse/FLINK-3599



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3868] [core] Specialized CopyableValue ...

2016-05-26 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1983#issuecomment-221873359
  
Thanks, Greg, this looks good all in all!

A few thinks we should do before merging this, in my opinion:
  - I think all classes should be annotated with `@Internal`, because they 
should not be used directly by users.

  - The `NullValueComparator` can be simplified to not really operate on 
the values at all. All NullValues are always the same. They are all equal, hash 
to a constant value, etc.

  - We were trying to get rid of the `Record` type - it was part of a very 
old legacy API. It is still in there because some people ended up using it, but 
I would like to add as little dependency to it as possible. Hence I'd suggest 
to drop the `RecordSerializer`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

2016-05-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1981#discussion_r64742780
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -342,21 +368,29 @@ case class Join(
   }
 
   private def testJoinCondition(expression: Expression): Unit = {
-def checkIfJoinCondition(exp : Expression) = if 
(exp.children.exists(!_.isInstanceOf[JoinFieldReference])) {
-  failValidation(s"Only join predicates supported. For non-join 
predicates use Table#where.")
-}
+def checkIfJoinCondition(exp : BinaryComparison) =
+  if (exp.children match {
--- End diff --

The condition looks good but can be a bit simplified like this:
```
exp.children match {
  case (x: JoinFieldReference) :: (y: JoinFieldReference) :: Nil
if x.isFromLeftInput != y.isFromLeftInput
  => // predicate references both inputs. All good!
  case _ => failValidation(
s"Only join predicates supported. For non-join predicates use 
Table#where.")
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3655) Allow comma-separated or multiple directories to be specified for FileInputFormat

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302050#comment-15302050
 ] 

ASF GitHub Bot commented on FLINK-3655:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1990#issuecomment-221866397
  
Thanks for opening that contribution.

Can you sum up the changes you made? That would make the review easier.
The changes look quite extensive. My gut feeling would be that it should 
not require so many changes, ideally only an additional loop in the "generate 
splits" method, and possibly in the "statistics" method.


> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat
> -
>
> Key: FLINK-3655
> URL: https://issues.apache.org/jira/browse/FLINK-3655
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Gna Phetsarath
>Priority: Minor
>  Labels: starter
>
> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat so that a DataSource will process the directories 
> sequentially.
>
> env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")
> in Scala
>env.readFile(paths: Seq[String])
> or 
>   env.readFile(path: String, otherPaths: String*)
> Wildcard support would be a bonus.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

2016-05-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1981#discussion_r64741369
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -272,23 +272,46 @@ case class Join(
 left.output ++ right.output
   }
 
+  private object JoinFieldReference {
+
+def apply(
+  name: String,
+  resultType: TypeInformation[_],
+  left: LogicalNode,
+  right: LogicalNode): JoinFieldReference = {
+
+  val joinInputField = left.output.zipWithIndex.find(_._1.name == 
name).map(_._2)
+   
.orElse(right.output.zipWithIndex.find(_._1.name == name).map(x => 
left.output.length + x._2))
+   .getOrElse(
+ throw new NoSuchElementException(s"""Could 
not find field: $name"""))
+   .asInstanceOf[Int]
+
+  new JoinFieldReference(name, resultType, left, right, joinInputField)
+}
+
+  }
+
   private case class JoinFieldReference(
--- End diff --

I am sorry, I think my last comment was confusing. Actually, I 
misinterpreted the semantics of the `RelBuilder.field(int, int, String)` 
method. I thought it would automatically add the offset of the left input. But 
apparently it doesn't...

How about we change the `JoinFieldReference` to this:

```
private case class JoinFieldReference(
  name: String,
  resultType: TypeInformation[_],
  left: LogicalNode,
  right: LogicalNode) extends Attribute {

val isFromLeftInput = left.output.map(_.name).contains(name)

val (indexInInput, indexInJoin) = if (isFromLeftInput) {
val indexInLeft = left.output.map(_.name).indexOf(name)
(indexInLeft, indexInLeft)
  } else {
val indexInRight = right.output.map(_.name).indexOf(name)
(indexInRight, indexInRight + left.output.length)
  }

override def toString = s"'$name"

override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {

  // look up type of field
  val fieldType = relBuilder.field(2, if (isFromLeftInput) 0 else 1, 
name).getType()
  // create a new RexInputRef with index offset
  new RexInputRef(indexInJoin, fieldType)
}

override def withName(newName: String): Attribute = {
  if (newName == name) {
this
  } else {
JoinFieldReference(newName, resultType, left, right)
  }
}
  }
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3922) Infinite recursion on TypeExtractor

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302041#comment-15302041
 ] 

ASF GitHub Bot commented on FLINK-3922:
---

Github user fpompermaier commented on the pull request:

https://github.com/apache/flink/pull/2011#issuecomment-221864540
  
From my tests it does but I think it is something that should have a unit 
test 


> Infinite recursion on TypeExtractor
> ---
>
> Key: FLINK-3922
> URL: https://issues.apache.org/jira/browse/FLINK-3922
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.0.2
>Reporter: Flavio Pompermaier
>Assignee: Timo Walther
>Priority: Critical
>
> This program cause a StackOverflow (infinite recursion) in the TypeExtractor:
> {code:title=TypeSerializerStackOverflowOnRecursivePojo.java|borderStyle=solid}
> public class TypeSerializerStackOverflowOnRecursivePojo {
>   public static class RecursivePojo implements Serializable {
>   private static final long serialVersionUID = 1L;
>   
>   private RecursivePojo parent;
>   public RecursivePojo(){}
>   public RecursivePojo(K k, V v) {
>   }
>   public RecursivePojo getParent() {
>   return parent;
>   }
>   public void setParent(RecursivePojo parent) {
>   this.parent = parent;
>   }
>   
>   }
>   public static class TypedTuple extends Tuple3 RecursivePojo>>{
>   private static final long serialVersionUID = 1L;
>   }
>   
>   public static void main(String[] args) throws Exception {
>   ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>   env.fromCollection(Arrays.asList(new RecursivePojo Map>("test",new HashMap(
>   .map(t-> {TypedTuple ret = new TypedTuple();ret.setFields("1", 
> "1", t);return ret;}).returns(TypedTuple.class)
>   .print();
>   }
>   
> }
> {code}
> The thrown Exception is the following:
> {code:title=Exception thrown}
> Exception in thread "main" java.lang.StackOverflowError
>   at 
> sun.reflect.generics.parser.SignatureParser.parsePackageNameAndSimpleClassTypeSignature(SignatureParser.java:328)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseClassTypeSignature(SignatureParser.java:310)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseFieldTypeSignature(SignatureParser.java:289)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseFieldTypeSignature(SignatureParser.java:283)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseTypeSignature(SignatureParser.java:485)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseReturnType(SignatureParser.java:627)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseMethodTypeSignature(SignatureParser.java:577)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseMethodSig(SignatureParser.java:171)
>   at 
> sun.reflect.generics.repository.ConstructorRepository.parse(ConstructorRepository.java:55)
>   at 
> sun.reflect.generics.repository.ConstructorRepository.parse(ConstructorRepository.java:43)
>   at 
> sun.reflect.generics.repository.AbstractRepository.(AbstractRepository.java:74)
>   at 
> sun.reflect.generics.repository.GenericDeclRepository.(GenericDeclRepository.java:49)
>   at 
> sun.reflect.generics.repository.ConstructorRepository.(ConstructorRepository.java:51)
>   at 
> sun.reflect.generics.repository.MethodRepository.(MethodRepository.java:46)
>   at 
> sun.reflect.generics.repository.MethodRepository.make(MethodRepository.java:59)
>   at java.lang.reflect.Method.getGenericInfo(Method.java:102)
>   at java.lang.reflect.Method.getGenericReturnType(Method.java:255)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.isValidPojoField(TypeExtractor.java:1610)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1671)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732)
>   at 
> 

  1   2   >