[jira] [Closed] (FLINK-3970) How to deal with "resouce isolation" problem
[ https://issues.apache.org/jira/browse/FLINK-3970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ZhengBowen closed FLINK-3970. - Resolution: Won't Fix > How to deal with "resouce isolation" problem > > > Key: FLINK-3970 > URL: https://issues.apache.org/jira/browse/FLINK-3970 > Project: Flink > Issue Type: Wish >Reporter: ZhengBowen > > For example, 'big query' and 'small query' are executed at the same time, you > need isolate 'big query' and 'small query' to prevent 'big query' exhaust > resouce(including i/o,mem,network) to make the 'small query' can complete > quickly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-3967 - Flink Sink for Rethink Db
Github user mans2singh commented on the pull request: https://github.com/apache/flink/pull/2031#issuecomment-222030024 Hi @rmetzger Thanks for your advice/suggestions. I will try to answer your questions below: Regarding motivation for the connector - I started working with Flink late last year and found it had some unique features (like streaming framework based on events from the grounds up and flexibility of windowing options). I am working through some real-time data flow use cases, where these capabilities can stream line our processing pipelines. The integration with RethinkDB came into play because from the dev perspective it is schema less, can ingest streams/batch of data, has map/reduce functionality. From the ops/scaling perspective it can be scaled/re-sharded in real-time without downtime. It can also provide change streams for a table, or a document. IMHO, Flink and Rethink complement each other for scalable, stream processing/analytics use cases. So, I thought it might be good to contribute back to the open source community and therefore the PR. Regarding guidelines for code-contribution - I did go through the document and I thought this PR would be in the same vein as the other streaming connectors (kafka, etc), and complementing them. There was no new API, or change in interfaces, and the code base is pretty light weight because Flink/RethinkDB both make it easy to integrate. However, I did not realize that it would be considered to a new feature and required a design review but perhaps that was my oversight. In any case, I really appreciate, the time you and your team took to review the PR and advice me. Please let me know what's your recommendation on how I should proceed. Thanks, Mans --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3967) Provide RethinkDB Sink for Flink
[ https://issues.apache.org/jira/browse/FLINK-3967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15303238#comment-15303238 ] ASF GitHub Bot commented on FLINK-3967: --- Github user mans2singh commented on the pull request: https://github.com/apache/flink/pull/2031#issuecomment-222030024 Hi @rmetzger Thanks for your advice/suggestions. I will try to answer your questions below: Regarding motivation for the connector - I started working with Flink late last year and found it had some unique features (like streaming framework based on events from the grounds up and flexibility of windowing options). I am working through some real-time data flow use cases, where these capabilities can stream line our processing pipelines. The integration with RethinkDB came into play because from the dev perspective it is schema less, can ingest streams/batch of data, has map/reduce functionality. From the ops/scaling perspective it can be scaled/re-sharded in real-time without downtime. It can also provide change streams for a table, or a document. IMHO, Flink and Rethink complement each other for scalable, stream processing/analytics use cases. So, I thought it might be good to contribute back to the open source community and therefore the PR. Regarding guidelines for code-contribution - I did go through the document and I thought this PR would be in the same vein as the other streaming connectors (kafka, etc), and complementing them. There was no new API, or change in interfaces, and the code base is pretty light weight because Flink/RethinkDB both make it easy to integrate. However, I did not realize that it would be considered to a new feature and required a design review but perhaps that was my oversight. In any case, I really appreciate, the time you and your team took to review the PR and advice me. Please let me know what's your recommendation on how I should proceed. Thanks, Mans > Provide RethinkDB Sink for Flink > > > Key: FLINK-3967 > URL: https://issues.apache.org/jira/browse/FLINK-3967 > Project: Flink > Issue Type: New Feature > Components: Streaming, Streaming Connectors >Affects Versions: 1.0.3 > Environment: All >Reporter: Mans Singh >Assignee: Mans Singh >Priority: Minor > Labels: features > Fix For: 1.1.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > Provide Sink to stream data from flink to rethink db. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3965) Delegating GraphAlgorithm
[ https://issues.apache.org/jira/browse/FLINK-3965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302926#comment-15302926 ] ASF GitHub Bot commented on FLINK-3965: --- Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/2032#issuecomment-221993882 There is more discussion in the ticket description, but for Gelly the idea is to keep algorithms small and discrete yet not duplicate computation. My first take on this merely cached algorithm outputs and if the configurations were "equal" would use the prior result. This second take is able to merge configurations which is much more powerful. We can do this because `DataSet`s are lazily evaluated and we can replace the old `DataSet` when we want to change how we generate the result. We "replace" the `DataSet` by actually wrapping it in a proxy class for which the `MethodHandler` always defers to the replaceable `DataSet`. > Delegating GraphAlgorithm > - > > Key: FLINK-3965 > URL: https://issues.apache.org/jira/browse/FLINK-3965 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Complex and related algorithms often overlap in computation of data. Two such > examples are: > 1) the local and global clustering coefficients each use a listing of > triangles > 2) the local clustering coefficient joins on vertex degree, and the > underlying triangle listing annotates edge degree which uses vertex degree > We can reuse and rewrite algorithm output by creating a {{ProxyObject}} as a > delegate for method calls to the {{DataSet}} returned by the algorithm. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3965] [gelly] Delegating GraphAlgorithm
Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/2032#issuecomment-221993882 There is more discussion in the ticket description, but for Gelly the idea is to keep algorithms small and discrete yet not duplicate computation. My first take on this merely cached algorithm outputs and if the configurations were "equal" would use the prior result. This second take is able to merge configurations which is much more powerful. We can do this because `DataSet`s are lazily evaluated and we can replace the old `DataSet` when we want to change how we generate the result. We "replace" the `DataSet` by actually wrapping it in a proxy class for which the `MethodHandler` always defers to the replaceable `DataSet`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-3975) Override baseurl when serving docs locally
[ https://issues.apache.org/jira/browse/FLINK-3975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dyana Rose updated FLINK-3975: -- Summary: Override baseurl when serving docs locally (was: docs build script isn't serving the preview on the correct base url) > Override baseurl when serving docs locally > -- > > Key: FLINK-3975 > URL: https://issues.apache.org/jira/browse/FLINK-3975 > Project: Flink > Issue Type: Bug > Components: Documentation > Environment: centos 7.2, jekyll 3.1.6, ruby 2.3.1 >Reporter: Dyana Rose >Priority: Trivial > > when running the documentation build script as: > {{./build_docs.sh -p}} the docs are built using the correctly overridden > baseurl, but they are then served from the wrong url making it a wee bit more > difficult than intended to review changes locally. > The following is the output from running the script: > {quote} > Configuration file: _config.yml > Configuration file: _local_preview_conf.yml > Source: /vagrant/flink/docs >Destination: /vagrant/flink/docs/target > Incremental build: disabled. Enable with --incremental > Generating... > /home/vagrant/.rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/pygments.rb-0.6.3/lib/pygments/popen.rb:57: > warning: Insecure world writable dir /opt/scala-2.11.8/bin in PATH, mode > 040777 > done in 16.736 seconds. > Auto-regeneration: enabled for '/vagrant/flink/docs' > Configuration file: /vagrant/flink/docs/_config.yml > Server address: > http://0.0.0.0:4000//ci.apache.org/projects/flink/flink-docs-master/ > Server running... press ctrl-c to stop. > {quote} > As you see it looks to be using both config files to build, but only the > default config file to serve. > This can be fixed by just removing the {{_local_preview_conf.yml}} file and > instead specify the baseurl as an option to the serve command, so it becomes > {{serve --config _config.yml --baseurl= --watch}}. Giving an output of: > {quote} > Configuration file: _config.yml > Source: /vagrant/flink/docs >Destination: /vagrant/flink/docs/target > Incremental build: disabled. Enable with --incremental > Generating... > /home/vagrant/.rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/pygments.rb-0.6.3/lib/pygments/popen.rb:57: > warning: Insecure world writable dir /opt/scala-2.11.8/bin in PATH, mode > 040777 > done in 15.928 seconds. > Auto-regeneration: enabled for '/vagrant/flink/docs' > Configuration file: /vagrant/flink/docs/_config.yml > Server address: http://0.0.0.0:4000/ > Server running... press ctrl-c to stop. > {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3975) docs build script isn't serving the preview on the correct base url
[ https://issues.apache.org/jira/browse/FLINK-3975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302897#comment-15302897 ] ASF GitHub Bot commented on FLINK-3975: --- GitHub user dyanarose opened a pull request: https://github.com/apache/flink/pull/2040 [FLINK-3975] [docs] Override baseurl when serving docs locally Updating build_docs.sh to serve the docs locally with the correct url. This change makes build_docs.sh run the equivalent serve command to build_docs.bat. removing now unnecessary config file. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dyanarose/flink FLINK-3975 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2040.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2040 commit 96af1da4e4770f2e6ed18d8413a6ea106927606e Author: Dyana RoseDate: 2016-05-26T20:40:07Z [FLINK-3975] [docs] Override baseurl when serving docs locally > docs build script isn't serving the preview on the correct base url > --- > > Key: FLINK-3975 > URL: https://issues.apache.org/jira/browse/FLINK-3975 > Project: Flink > Issue Type: Bug > Components: Documentation > Environment: centos 7.2, jekyll 3.1.6, ruby 2.3.1 >Reporter: Dyana Rose >Priority: Trivial > > when running the documentation build script as: > {{./build_docs.sh -p}} the docs are built using the correctly overridden > baseurl, but they are then served from the wrong url making it a wee bit more > difficult than intended to review changes locally. > The following is the output from running the script: > {quote} > Configuration file: _config.yml > Configuration file: _local_preview_conf.yml > Source: /vagrant/flink/docs >Destination: /vagrant/flink/docs/target > Incremental build: disabled. Enable with --incremental > Generating... > /home/vagrant/.rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/pygments.rb-0.6.3/lib/pygments/popen.rb:57: > warning: Insecure world writable dir /opt/scala-2.11.8/bin in PATH, mode > 040777 > done in 16.736 seconds. > Auto-regeneration: enabled for '/vagrant/flink/docs' > Configuration file: /vagrant/flink/docs/_config.yml > Server address: > http://0.0.0.0:4000//ci.apache.org/projects/flink/flink-docs-master/ > Server running... press ctrl-c to stop. > {quote} > As you see it looks to be using both config files to build, but only the > default config file to serve. > This can be fixed by just removing the {{_local_preview_conf.yml}} file and > instead specify the baseurl as an option to the serve command, so it becomes > {{serve --config _config.yml --baseurl= --watch}}. Giving an output of: > {quote} > Configuration file: _config.yml > Source: /vagrant/flink/docs >Destination: /vagrant/flink/docs/target > Incremental build: disabled. Enable with --incremental > Generating... > /home/vagrant/.rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/pygments.rb-0.6.3/lib/pygments/popen.rb:57: > warning: Insecure world writable dir /opt/scala-2.11.8/bin in PATH, mode > 040777 > done in 15.928 seconds. > Auto-regeneration: enabled for '/vagrant/flink/docs' > Configuration file: /vagrant/flink/docs/_config.yml > Server address: http://0.0.0.0:4000/ > Server running... press ctrl-c to stop. > {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3975] [docs] Override baseurl when serv...
GitHub user dyanarose opened a pull request: https://github.com/apache/flink/pull/2040 [FLINK-3975] [docs] Override baseurl when serving docs locally Updating build_docs.sh to serve the docs locally with the correct url. This change makes build_docs.sh run the equivalent serve command to build_docs.bat. removing now unnecessary config file. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dyanarose/flink FLINK-3975 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2040.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2040 commit 96af1da4e4770f2e6ed18d8413a6ea106927606e Author: Dyana RoseDate: 2016-05-26T20:40:07Z [FLINK-3975] [docs] Override baseurl when serving docs locally --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-3978) Add hasBroadcastVariable method to RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-3978: -- Fix Version/s: 1.1.0 > Add hasBroadcastVariable method to RuntimeContext > - > > Key: FLINK-3978 > URL: https://issues.apache.org/jira/browse/FLINK-3978 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > Fix For: 1.1.0 > > > The javadocs for {{RuntimeContext}} state that {{getAccumulator}} "throws an > exception if the accumulator does not exist or if the accumulator exists, but > with different type", although {{AbstractRuntimeUDFContext}} does not throw > an exception but will return null. > The javadocs for {{getBroadcastVariable}} do not mention throwing an > exception. Currently the only way to handle a broadcast variable that that > may or may not exist is to catch and ignore the exception. Adding a > {{containsBroadcastVariable}} method to {{RuntimeContext}} would make this > explicit. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3910) New self-join operator
[ https://issues.apache.org/jira/browse/FLINK-3910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302853#comment-15302853 ] Greg Hogan commented on FLINK-3910: --- I see this case as part of flushing out the remaining join operators. Flink could get by with {{map}}, {{reduce}}, and {{join}} but we are kindly given additional operators for clarity and performance. Outer joins have been quite useful despite that we could instead use {{coGroup}}. anti- and semi-joins would be similarly useful but are for now just comments in code. {{selfJoin}} can have a large impact on performance. A {{reduce}} is {{O(n)}} but a join is {{O(n^2)}} so data skew has a much larger effect. How would extension classes contrast with simply marking methods as {{@PublicEvolving}}? I do see that it may be desirable to defer major features to the next release when there is insufficient time to settle. > New self-join operator > -- > > Key: FLINK-3910 > URL: https://issues.apache.org/jira/browse/FLINK-3910 > Project: Flink > Issue Type: New Feature > Components: DataSet API, Java API, Scala API >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > > Flink currently provides inner- and outer-joins as well as cogroup and the > non-keyed cross. {{JoinOperator}} hints at future support for semi- and > anti-joins. > Many Gelly algorithms perform a self-join [0]. Still pending reviews, > FLINK-3768 performs a self-join on non-skewed data in TriangleListing.java > and FLINK-3780 performs a self-join on skewed data in JaccardSimilarity.java. > A {{SelfJoinHint}} will select between skewed and non-skewed implementations. > The object-reuse-disabled case can be simply handled with a new {{Operator}}. > The object-reuse-enabled case requires either {{CopyableValue}} types (as in > the code above) or a custom driver which has access to the serializer (or > making the serializer accessible to rich functions, and I think there be > dragons). > If the idea of a self-join is agreeable, I'd like to work out a rough > implementation and go from there. > [0] https://en.wikipedia.org/wiki/Join_%28SQL%29#Self-join -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3910) New self-join operator
[ https://issues.apache.org/jira/browse/FLINK-3910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302852#comment-15302852 ] Greg Hogan commented on FLINK-3910: --- I see this case as part of flushing out the remaining join operators. Flink could get by with `map`, `reduce`, and `join` but we are kindly given additional operators for clarity and performance. Outer joins have been quite useful despite that we could instead use `coGroup`. anti- and semi-joins would be similarly useful but are for now just comments in code. `selfJoin` can have a large impact on performance. A `reduce` is `O(n)` but a join is `O(n^2)` so data skew has a much larger effect. How would extension classes contrast with simply marking methods as `@PublicEvolving`? I do see that it may be desirable to defer major features to the next release when there is insufficient time to settle. > New self-join operator > -- > > Key: FLINK-3910 > URL: https://issues.apache.org/jira/browse/FLINK-3910 > Project: Flink > Issue Type: New Feature > Components: DataSet API, Java API, Scala API >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > > Flink currently provides inner- and outer-joins as well as cogroup and the > non-keyed cross. {{JoinOperator}} hints at future support for semi- and > anti-joins. > Many Gelly algorithms perform a self-join [0]. Still pending reviews, > FLINK-3768 performs a self-join on non-skewed data in TriangleListing.java > and FLINK-3780 performs a self-join on skewed data in JaccardSimilarity.java. > A {{SelfJoinHint}} will select between skewed and non-skewed implementations. > The object-reuse-disabled case can be simply handled with a new {{Operator}}. > The object-reuse-enabled case requires either {{CopyableValue}} types (as in > the code above) or a custom driver which has access to the serializer (or > making the serializer accessible to rich functions, and I think there be > dragons). > If the idea of a self-join is agreeable, I'd like to work out a rough > implementation and go from there. > [0] https://en.wikipedia.org/wiki/Join_%28SQL%29#Self-join -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (FLINK-3910) New self-join operator
[ https://issues.apache.org/jira/browse/FLINK-3910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-3910: -- Comment: was deleted (was: I see this case as part of flushing out the remaining join operators. Flink could get by with `map`, `reduce`, and `join` but we are kindly given additional operators for clarity and performance. Outer joins have been quite useful despite that we could instead use `coGroup`. anti- and semi-joins would be similarly useful but are for now just comments in code. `selfJoin` can have a large impact on performance. A `reduce` is `O(n)` but a join is `O(n^2)` so data skew has a much larger effect. How would extension classes contrast with simply marking methods as `@PublicEvolving`? I do see that it may be desirable to defer major features to the next release when there is insufficient time to settle.) > New self-join operator > -- > > Key: FLINK-3910 > URL: https://issues.apache.org/jira/browse/FLINK-3910 > Project: Flink > Issue Type: New Feature > Components: DataSet API, Java API, Scala API >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > > Flink currently provides inner- and outer-joins as well as cogroup and the > non-keyed cross. {{JoinOperator}} hints at future support for semi- and > anti-joins. > Many Gelly algorithms perform a self-join [0]. Still pending reviews, > FLINK-3768 performs a self-join on non-skewed data in TriangleListing.java > and FLINK-3780 performs a self-join on skewed data in JaccardSimilarity.java. > A {{SelfJoinHint}} will select between skewed and non-skewed implementations. > The object-reuse-disabled case can be simply handled with a new {{Operator}}. > The object-reuse-enabled case requires either {{CopyableValue}} types (as in > the code above) or a custom driver which has access to the serializer (or > making the serializer accessible to rich functions, and I think there be > dragons). > If the idea of a self-join is agreeable, I'd like to work out a rough > implementation and go from there. > [0] https://en.wikipedia.org/wiki/Join_%28SQL%29#Self-join -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3978) Add hasBroadcastVariable method to RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302849#comment-15302849 ] ASF GitHub Bot commented on FLINK-3978: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/2039 [FLINK-3978] [core] Add hasBroadcastVariable method to RuntimeContext New method RuntimeContext.hasBroadcastVariable(String). You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 3978_add_hasbroadcastvariable_method_to_runtimecontext Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2039.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2039 commit 39391e07f075d5e1bd2f791da4c610d766968b94 Author: Greg HoganDate: 2016-05-26T18:45:00Z [FLINK-3978] [core] Add hasBroadcastVariable method to RuntimeContext New method RuntimeContext.hasBroadcastVariable(String). > Add hasBroadcastVariable method to RuntimeContext > - > > Key: FLINK-3978 > URL: https://issues.apache.org/jira/browse/FLINK-3978 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > The javadocs for {{RuntimeContext}} state that {{getAccumulator}} "throws an > exception if the accumulator does not exist or if the accumulator exists, but > with different type", although {{AbstractRuntimeUDFContext}} does not throw > an exception but will return null. > The javadocs for {{getBroadcastVariable}} do not mention throwing an > exception. Currently the only way to handle a broadcast variable that that > may or may not exist is to catch and ignore the exception. Adding a > {{containsBroadcastVariable}} method to {{RuntimeContext}} would make this > explicit. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3978] [core] Add hasBroadcastVariable m...
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/2039 [FLINK-3978] [core] Add hasBroadcastVariable method to RuntimeContext New method RuntimeContext.hasBroadcastVariable(String). You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 3978_add_hasbroadcastvariable_method_to_runtimecontext Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2039.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2039 commit 39391e07f075d5e1bd2f791da4c610d766968b94 Author: Greg HoganDate: 2016-05-26T18:45:00Z [FLINK-3978] [core] Add hasBroadcastVariable method to RuntimeContext New method RuntimeContext.hasBroadcastVariable(String). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3980) Remove ExecutionConfig.PARALLELISM_UNKNOWN
Greg Hogan created FLINK-3980: - Summary: Remove ExecutionConfig.PARALLELISM_UNKNOWN Key: FLINK-3980 URL: https://issues.apache.org/jira/browse/FLINK-3980 Project: Flink Issue Type: Improvement Components: Core Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor Fix For: 1.1.0 FLINK-3589 added {{ExecutionConfig.PARALLELISM_DEFAULT}} and {{ExecutionConfig.PARALLELISM_UNKNOWN}}. The former gave a name to the contant {{-1}} and the latter was used as a default no-op when setting the parallelism. It's nice to keep these intents separate but given the current implementation of Operator parallelism users can get by using {{PARALLELISM_DEFAULT}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3589) Allow setting Operator parallelism to default value
[ https://issues.apache.org/jira/browse/FLINK-3589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302712#comment-15302712 ] ASF GitHub Bot commented on FLINK-3589: --- Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1778#issuecomment-221965805 It's good to have you back :) I think we can get by with only `PARALLELISM_DEFAULT` since `java.operators.Operator.setParallelism(int)` is only called by the Scala and Python APIs. I'll open a ticket. > Allow setting Operator parallelism to default value > --- > > Key: FLINK-3589 > URL: https://issues.apache.org/jira/browse/FLINK-3589 > Project: Flink > Issue Type: Improvement > Components: Java API >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > Fix For: 1.1.0 > > > User's can override the parallelism for a single operator by calling > {{Operator.setParallelism}}, which accepts a positive value. {{Operator}} > uses {{-1}} to indicate default parallelism. It would be nice to name and > accept this default value. > This would enable user algorithms to allow configurable parallelism while > still chaining operator methods. > For example, currently: > {code} > private int parallelism; > ... > public void setParallelism(int parallelism) { > this.parallelism = parallelism; > } > ... > MapOperator, Edge > newEdges = > edges > .map(new MyMapFunction()) > .name("My map function"); > if (parallelism > 0) { > newEdges.setParallelism(parallelism); > } > {code} > Could be simplified to: > {code} > private int parallelism = Operator.DEFAULT_PARALLELISM; > ... > public void setParallelism(int parallelism) { > this.parallelism = parallelism; > } > ... > DataSet > newEdges = edges > .map(new MyMapFunction()) > .setParallelism(parallelism) > .name("My map function"); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3589] Allow setting Operator parallelis...
Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1778#issuecomment-221965805 It's good to have you back :) I think we can get by with only `PARALLELISM_DEFAULT` since `java.operators.Operator.setParallelism(int)` is only called by the Scala and Python APIs. I'll open a ticket. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3969) Log Exceptions Thrown by Invokable
[ https://issues.apache.org/jira/browse/FLINK-3969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302682#comment-15302682 ] Stephan Ewen commented on FLINK-3969: - The log statement should probably go in line 612. https://github.com/apache/flink/blob/release-1.0.2-rc3/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L612 > Log Exceptions Thrown by Invokable > -- > > Key: FLINK-3969 > URL: https://issues.apache.org/jira/browse/FLINK-3969 > Project: Flink > Issue Type: Improvement >Reporter: Aljoscha Krettek > > Currently, in {{Task.run}} the exceptions thrown by > {{AbstractInvokable.invoke()}} are not logged. They only appear in the web > front-end or are forwarded to the client. > [~davidkim] reported this problem on the mailing list. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3589) Allow setting Operator parallelism to default value
[ https://issues.apache.org/jira/browse/FLINK-3589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302673#comment-15302673 ] ASF GitHub Bot commented on FLINK-3589: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1778#issuecomment-221960487 Couldn't that be equally well checked in the program that calls `setParallelism(...)`? Just asking because this is handled only in very few parts of the DataSet API anyways (not at all handled in streaming) and seems a bit of a special case. You may notice, I am starting to ask this question about "what needs to be part of the API, what can be part of the user program" quite a bit lately ;-) Trying to be conscious about complexity and commitment to maintaining all the API contructs. > Allow setting Operator parallelism to default value > --- > > Key: FLINK-3589 > URL: https://issues.apache.org/jira/browse/FLINK-3589 > Project: Flink > Issue Type: Improvement > Components: Java API >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > Fix For: 1.1.0 > > > User's can override the parallelism for a single operator by calling > {{Operator.setParallelism}}, which accepts a positive value. {{Operator}} > uses {{-1}} to indicate default parallelism. It would be nice to name and > accept this default value. > This would enable user algorithms to allow configurable parallelism while > still chaining operator methods. > For example, currently: > {code} > private int parallelism; > ... > public void setParallelism(int parallelism) { > this.parallelism = parallelism; > } > ... > MapOperator, Edge > newEdges = > edges > .map(new MyMapFunction()) > .name("My map function"); > if (parallelism > 0) { > newEdges.setParallelism(parallelism); > } > {code} > Could be simplified to: > {code} > private int parallelism = Operator.DEFAULT_PARALLELISM; > ... > public void setParallelism(int parallelism) { > this.parallelism = parallelism; > } > ... > DataSet > newEdges = edges > .map(new MyMapFunction()) > .setParallelism(parallelism) > .name("My map function"); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3589] Allow setting Operator parallelis...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1778#issuecomment-221960487 Couldn't that be equally well checked in the program that calls `setParallelism(...)`? Just asking because this is handled only in very few parts of the DataSet API anyways (not at all handled in streaming) and seems a bit of a special case. You may notice, I am starting to ask this question about "what needs to be part of the API, what can be part of the user program" quite a bit lately ;-) Trying to be conscious about complexity and commitment to maintaining all the API contructs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-3978) Add hasBroadcastVariable method to RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-3978: -- Summary: Add hasBroadcastVariable method to RuntimeContext (was: Add containsBroadcastVariable method to RuntimeContext) > Add hasBroadcastVariable method to RuntimeContext > - > > Key: FLINK-3978 > URL: https://issues.apache.org/jira/browse/FLINK-3978 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > The javadocs for {{RuntimeContext}} state that {{getAccumulator}} "throws an > exception if the accumulator does not exist or if the accumulator exists, but > with different type", although {{AbstractRuntimeUDFContext}} does not throw > an exception but will return null. > The javadocs for {{getBroadcastVariable}} do not mention throwing an > exception. Currently the only way to handle a broadcast variable that that > may or may not exist is to catch and ignore the exception. Adding a > {{containsBroadcastVariable}} method to {{RuntimeContext}} would make this > explicit. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3723) Aggregate Functions and scalar expressions shouldn't be mixed in select
[ https://issues.apache.org/jira/browse/FLINK-3723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302667#comment-15302667 ] Stephan Ewen commented on FLINK-3723: - Very tricky. The relationship to SQL is quite a strong argument, for everyone coming from SQL but wanting something that embeds better. If one wants to select after {{agg}}, one probably also writes a lot of redundant lines. Finally, one comment on fields that are neither groupBy, nor aggregated: In core SQL, these should not exist, but many dialects have them. It is a bit as if one selects some distinct values and some non distinct values. > Aggregate Functions and scalar expressions shouldn't be mixed in select > --- > > Key: FLINK-3723 > URL: https://issues.apache.org/jira/browse/FLINK-3723 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.1 >Reporter: Yijie Shen >Priority: Critical > > When we type {code}select deptno, name, max(age) from dept group by > deptno;{code} in calcite or Oracle, it will complain {code}Expression 'NAME' > is not being grouped{code} or {code}Column 'dept.name' is invalid in the > select list because it is not contained in either an aggregate function or > the GROUP BY clause.{code} because of the nondeterministic result. > Therefore, I suggest to separate the current functionality of `select` into > two api, the new `select` only handle scalar expressions, and an `agg` accept > Aggregates. > {code} > def select(exprs: Expression*) > def agg(aggs: Aggregation*) > > tbl.groupBy('deptno) >.agg('age.max, 'age.min) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3910) New self-join operator
[ https://issues.apache.org/jira/browse/FLINK-3910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302656#comment-15302656 ] Stephan Ewen commented on FLINK-3910: - Let me play devil's advocate here: Every time we add new specialized constructs, we make our lives harder by committing to maintaining them. There are two things to think about: - How much difference to these three self join variants make? Enough to have them in the core? Or enough to have them in Utils or Gelly directly? - How deeply do we want to embed them in the DataSet API. Anything added is virtually impossible to remove, and a very long term commitment. Given the plethora of things one could add to the APIs, the cost of maintenance, and the benefit of a concise API, I think we need to start thinking about a staging process. We could add constructs not directly to the API, but to something like extension classes. The once a construct get used very often (let's track this by users that look for that operation, but do not find the extension class) we start moving it to the core API. That would sort of act as a steering process that leads to having the common and frequent operations on the core API, and the more specialized ones in extension classes. What do you think? > New self-join operator > -- > > Key: FLINK-3910 > URL: https://issues.apache.org/jira/browse/FLINK-3910 > Project: Flink > Issue Type: New Feature > Components: DataSet API, Java API, Scala API >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > > Flink currently provides inner- and outer-joins as well as cogroup and the > non-keyed cross. {{JoinOperator}} hints at future support for semi- and > anti-joins. > Many Gelly algorithms perform a self-join [0]. Still pending reviews, > FLINK-3768 performs a self-join on non-skewed data in TriangleListing.java > and FLINK-3780 performs a self-join on skewed data in JaccardSimilarity.java. > A {{SelfJoinHint}} will select between skewed and non-skewed implementations. > The object-reuse-disabled case can be simply handled with a new {{Operator}}. > The object-reuse-enabled case requires either {{CopyableValue}} types (as in > the code above) or a custom driver which has access to the serializer (or > making the serializer accessible to rich functions, and I think there be > dragons). > If the idea of a self-join is agreeable, I'd like to work out a rough > implementation and go from there. > [0] https://en.wikipedia.org/wiki/Join_%28SQL%29#Self-join -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3758) Add possibility to register accumulators in custom triggers
[ https://issues.apache.org/jira/browse/FLINK-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302624#comment-15302624 ] Aljoscha Krettek commented on FLINK-3758: - There might be one, I don't know too much about it, unfortunately. Maybe [~Zentol] or [~rmetzger] could help us with that. About whether it will replace the accumulators, I'm not sure. For me the accumulators always seemed the right tool for monitoring so maybe they will be replaced by something better and then be deprecated at some point. [~StephanEwen] and I recently had a short discussion about that, maybe he'd also like to chime in on this. > Add possibility to register accumulators in custom triggers > --- > > Key: FLINK-3758 > URL: https://issues.apache.org/jira/browse/FLINK-3758 > Project: Flink > Issue Type: Improvement >Reporter: Konstantin Knauf >Assignee: Konstantin Knauf >Priority: Minor > > For monitoring purposes it would be nice to be able to to use accumulators in > custom trigger functions. > Basically, the trigger context could just expose {{getAccumulator}} of > {{RuntimeContext}} or does this create problems I am not aware of? > Adding accumulators in a trigger function is more difficult, I think, but > that's not really neccessary as the accummulator could just be added in some > other upstream operator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302570#comment-15302570 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/1813#issuecomment-221944267 @subhankarb @rmetzger It's actually a good question... Should we push this further. Otherwise this PR is getting old. @subhankarb : if you update this PR and want feedback, you need to add a comment (best add the requested reviewers via @ ) to make sure to get attention I just had a look and one comment about logging the exception cause is still not addressed. Can you fis this and maybe rebase to current master? > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/1813#issuecomment-221944267 @subhankarb @rmetzger It's actually a good question... Should we push this further. Otherwise this PR is getting old. @subhankarb : if you update this PR and want feedback, you need to add a comment (best add the requested reviewers via @ ) to make sure to get attention I just had a look and one comment about logging the exception cause is still not addressed. Can you fis this and maybe rebase to current master? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3190) Retry rate limits for DataStream API
[ https://issues.apache.org/jira/browse/FLINK-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302566#comment-15302566 ] Robert Metzger commented on FLINK-3190: --- [~packet], since you've requested this feature, would you be available to review and test the pull request? > Retry rate limits for DataStream API > > > Key: FLINK-3190 > URL: https://issues.apache.org/jira/browse/FLINK-3190 > Project: Flink > Issue Type: Improvement >Reporter: Sebastian Klemke >Assignee: Michał Fijołek >Priority: Minor > > For a long running stream processing job, absolute numbers of retries don't > make much sense: The job will accumulate transient errors over time and will > die eventually when thresholds are exceeded. Rate limits are better suited in > this scenario: A job should only die, if it fails too often in a given time > frame. To better overcome transient errors, retry delays could be used, as > suggested in other issues. > Absolute numbers of retries can still make sense, if failing operators don't > make any progress at all. We can measure progress by OperatorState changes > and by observing output, as long as the operator in question is not a sink. > If operator state changes and/or operator produces output, we can assume it > makes progress. > As an example, let's say we configured a retry rate limit of 10 retries per > hour and a non-sink operator A. If the operator fails once every 10 minutes > and produces output between failures, it should not lead to job termination. > But if the operator fails 11 times in an hour or does not produce output > between 11 consecutive failures, job should be terminated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3758) Add possibility to register accumulators in custom triggers
[ https://issues.apache.org/jira/browse/FLINK-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302555#comment-15302555 ] Konstantin Knauf commented on FLINK-3758: - [~aljoscha] Yes, in particular my goal is to monitor the number of currently buffered events, but in more advanced triggers there are certainly other points of interest. Is there a design document for these metrics? As far as I understand it, metrics will not replace accumulators, right? > Add possibility to register accumulators in custom triggers > --- > > Key: FLINK-3758 > URL: https://issues.apache.org/jira/browse/FLINK-3758 > Project: Flink > Issue Type: Improvement >Reporter: Konstantin Knauf >Assignee: Konstantin Knauf >Priority: Minor > > For monitoring purposes it would be nice to be able to to use accumulators in > custom trigger functions. > Basically, the trigger context could just expose {{getAccumulator}} of > {{RuntimeContext}} or does this create problems I am not aware of? > Adding accumulators in a trigger function is more difficult, I think, but > that's not really neccessary as the accummulator could just be added in some > other upstream operator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3978) Add containsBroadcastVariable method to RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302501#comment-15302501 ] Greg Hogan commented on FLINK-3978: --- That was my first thought, and I wanted to name the ticket "add hassers to RuntimeContext", but I deferred to {{Collections}}'s nomenclature when typing this out. > Add containsBroadcastVariable method to RuntimeContext > -- > > Key: FLINK-3978 > URL: https://issues.apache.org/jira/browse/FLINK-3978 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > The javadocs for {{RuntimeContext}} state that {{getAccumulator}} "throws an > exception if the accumulator does not exist or if the accumulator exists, but > with different type", although {{AbstractRuntimeUDFContext}} does not throw > an exception but will return null. > The javadocs for {{getBroadcastVariable}} do not mention throwing an > exception. Currently the only way to handle a broadcast variable that that > may or may not exist is to catch and ignore the exception. Adding a > {{containsBroadcastVariable}} method to {{RuntimeContext}} would make this > explicit. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3978) Add containsBroadcastVariable method to RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302482#comment-15302482 ] Stephan Ewen commented on FLINK-3978: - How about calling it {{hasBroadcastVariable}}`? > Add containsBroadcastVariable method to RuntimeContext > -- > > Key: FLINK-3978 > URL: https://issues.apache.org/jira/browse/FLINK-3978 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > The javadocs for {{RuntimeContext}} state that {{getAccumulator}} "throws an > exception if the accumulator does not exist or if the accumulator exists, but > with different type", although {{AbstractRuntimeUDFContext}} does not throw > an exception but will return null. > The javadocs for {{getBroadcastVariable}} do not mention throwing an > exception. Currently the only way to handle a broadcast variable that that > may or may not exist is to catch and ignore the exception. Adding a > {{containsBroadcastVariable}} method to {{RuntimeContext}} would make this > explicit. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API
[ https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302384#comment-15302384 ] ASF GitHub Bot commented on FLINK-3650: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1856#discussion_r64779643 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java --- @@ -72,8 +73,8 @@ public T reduce(T value1, T value2) throws Exception { for (int position : fields) { // Save position of compared key // Get both values - both implement comparable - Comparable comparable1 = value1.getFieldNotNull(position); - Comparable comparable2 = value2.getFieldNotNull(position); + Comparable comparable1 = ((Tuple)value1).getFieldNotNull(position); --- End diff -- @tillrohrmann - Any chance of a review on the updated push. > Add maxBy/minBy to Scala DataSet API > > > Key: FLINK-3650 > URL: https://issues.apache.org/jira/browse/FLINK-3650 > Project: Flink > Issue Type: Improvement > Components: Java API, Scala API >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: ramkrishna.s.vasudevan > > The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. > These methods are not supported by the Scala DataSet API. These methods > should be added in order to have a consistent API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-3650 Add maxBy/minBy to Scala DataSet AP...
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1856#discussion_r64779643 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java --- @@ -72,8 +73,8 @@ public T reduce(T value1, T value2) throws Exception { for (int position : fields) { // Save position of compared key // Get both values - both implement comparable - Comparable comparable1 = value1.getFieldNotNull(position); - Comparable comparable2 = value2.getFieldNotNull(position); + Comparable comparable1 = ((Tuple)value1).getFieldNotNull(position); --- End diff -- @tillrohrmann - Any chance of a review on the updated push. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1981#issuecomment-221926857 Thanks for looking into how Calcite splits the join conditions. You are right, we do not need to be "smarter" than Calcite and approve predicates that will be later rejected. So +1 for your suggestion. Apart from the documentation update I would only extend the error messages with the conditions that cause the failure. After that the PR should be good to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1981#discussion_r64778861 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala --- @@ -298,11 +338,40 @@ case class Join( val resolvedJoin = super.validate(tableEnv).asInstanceOf[Join] if (!resolvedJoin.condition.forall(_.resultType == BOOLEAN_TYPE_INFO)) { failValidation(s"filter expression ${resolvedJoin.condition} is not a boolean") -} else if (!ambiguousName.isEmpty) { +} else if (ambiguousName.nonEmpty) { failValidation(s"join relations with ambiguous names: ${ambiguousName.mkString(", ")}") } + +resolvedJoin.condition.foreach(testJoinCondition(_)) resolvedJoin } + + private def testJoinCondition(expression: Expression): Unit = { +def checkIfJoinCondition(exp : BinaryComparison) = exp.children match { +case (x : JoinFieldReference) :: (y : JoinFieldReference) :: Nil + if x.isFromLeftInput != y.isFromLeftInput => Unit +case _ => failValidation( + s"Only join predicates supported. For non-join predicates use Table#where.") + } + +var equiJoinFound = false +def validateConditions(exp: Expression, isAndBranch: Boolean): Unit = exp match { + case x: And => x.children.foreach(validateConditions(_, isAndBranch)) + case x: Or => x.children.foreach(validateConditions(_, isAndBranch = false)) + case x: EqualTo => +if (isAndBranch) { + equiJoinFound = true +} +checkIfJoinCondition(x) + case x: BinaryComparison => checkIfJoinCondition(x) + case x => failValidation(s"Unsupported condition type: ${x.getClass.getSimpleName}.") +} + +validateConditions(expression, isAndBranch = true) +if (!equiJoinFound) { + failValidation(s"At least one equi-join required.") --- End diff -- Add the whole join predicate to the error message? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1981#discussion_r64778778 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala --- @@ -298,11 +338,40 @@ case class Join( val resolvedJoin = super.validate(tableEnv).asInstanceOf[Join] if (!resolvedJoin.condition.forall(_.resultType == BOOLEAN_TYPE_INFO)) { failValidation(s"filter expression ${resolvedJoin.condition} is not a boolean") -} else if (!ambiguousName.isEmpty) { +} else if (ambiguousName.nonEmpty) { failValidation(s"join relations with ambiguous names: ${ambiguousName.mkString(", ")}") } + +resolvedJoin.condition.foreach(testJoinCondition(_)) resolvedJoin } + + private def testJoinCondition(expression: Expression): Unit = { +def checkIfJoinCondition(exp : BinaryComparison) = exp.children match { +case (x : JoinFieldReference) :: (y : JoinFieldReference) :: Nil + if x.isFromLeftInput != y.isFromLeftInput => Unit +case _ => failValidation( + s"Only join predicates supported. For non-join predicates use Table#where.") --- End diff -- Add the condition that causes the failure to the error message. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302332#comment-15302332 ] ASF GitHub Bot commented on FLINK-3034: --- Github user rphillips commented on the pull request: https://github.com/apache/flink/pull/1813#issuecomment-221921229 Very handy. Thanks > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user rphillips commented on the pull request: https://github.com/apache/flink/pull/1813#issuecomment-221921229 Very handy. Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-3978) Add containsBroadcastVariable methods to RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-3978: -- Summary: Add containsBroadcastVariable methods to RuntimeContext (was: Add contains methods to RuntimeContext) > Add containsBroadcastVariable methods to RuntimeContext > --- > > Key: FLINK-3978 > URL: https://issues.apache.org/jira/browse/FLINK-3978 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > The javadocs for {{RuntimeContext}} state that {{getAccumulator}} "throws an > exception if the accumulator does not exist or if the accumulator exists, but > with different type", although {{AbstractRuntimeUDFContext}} does not throw > an exception but will return null. > The javadocs for {{getBroadcastVariable}} do not mention throwing an > exception. Currently the only way to handle a broadcast variable that that > may or may not exist is to catch and ignore the exception. Adding a > {{containsBroadcastVariable}} method to {{RuntimeContext}} would make this > explicit. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3978) Add containsBroadcastVariable method to RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-3978: -- Summary: Add containsBroadcastVariable method to RuntimeContext (was: Add containsBroadcastVariable methods to RuntimeContext) > Add containsBroadcastVariable method to RuntimeContext > -- > > Key: FLINK-3978 > URL: https://issues.apache.org/jira/browse/FLINK-3978 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > The javadocs for {{RuntimeContext}} state that {{getAccumulator}} "throws an > exception if the accumulator does not exist or if the accumulator exists, but > with different type", although {{AbstractRuntimeUDFContext}} does not throw > an exception but will return null. > The javadocs for {{getBroadcastVariable}} do not mention throwing an > exception. Currently the only way to handle a broadcast variable that that > may or may not exist is to catch and ignore the exception. Adding a > {{containsBroadcastVariable}} method to {{RuntimeContext}} would make this > explicit. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1813#issuecomment-221913785 Actually, its quite easy to implement a redis sink yourself. Check out the `RedisResultSink` in this example: https://github.com/dataArtisans/yahoo-streaming-benchmark/blob/b3d35a761bae468affed92ef70d11739ddc9d432/flink-benchmarks/src/main/java/flink/benchmark/AdvertisingTopologyFlinkWindows.java#L319 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302297#comment-15302297 ] ASF GitHub Bot commented on FLINK-3034: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1813#issuecomment-221913785 Actually, its quite easy to implement a redis sink yourself. Check out the `RedisResultSink` in this example: https://github.com/dataArtisans/yahoo-streaming-benchmark/blob/b3d35a761bae468affed92ef70d11739ddc9d432/flink-benchmarks/src/main/java/flink/benchmark/AdvertisingTopologyFlinkWindows.java#L319 > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3979) [documentation]add missed import classes in run_example_quickstart
[ https://issues.apache.org/jira/browse/FLINK-3979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302267#comment-15302267 ] ASF GitHub Bot commented on FLINK-3979: --- GitHub user jiazhai opened a pull request: https://github.com/apache/flink/pull/2038 [FLINK-3979] documentation - add missed import classes in run_example_quickstart The classes that need to be imported for this part of code ``` result .map(new MapFunction, String>() { @Override public String map(Tuple2 tuple) { return tuple.toString(); } }) .addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new SimpleStringSchema())); ``` is ``` import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.api.common.functions.MapFunction; ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/jiazhai/flink f3979 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2038.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2038 commit 84de7466757c86e968eb9ddce8ba7865821e9091 Author: Zhai Jia Date: 2016-05-26T15:24:40Z [FLINK-3979] add import for code > [documentation]add missed import classes in run_example_quickstart > -- > > Key: FLINK-3979 > URL: https://issues.apache.org/jira/browse/FLINK-3979 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Jia Zhai > > The classes that need to be imported for this part of code > {code} > result > .map(new MapFunction , String>() { > @Override > public String map(Tuple2 tuple) { > return tuple.toString(); > } > }) > .addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new > SimpleStringSchema())); > {code} > is > {code} > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08; > import org.apache.flink.streaming.util.serialization.SimpleStringSchema; > import org.apache.flink.api.common.functions.MapFunction; > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302266#comment-15302266 ] ASF GitHub Bot commented on FLINK-3034: --- Github user rphillips commented on the pull request: https://github.com/apache/flink/pull/1813#issuecomment-221908345 What is the status on this PR? With my use case this PR would be very handy. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user rphillips commented on the pull request: https://github.com/apache/flink/pull/1813#issuecomment-221908345 What is the status on this PR? With my use case this PR would be very handy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3979] documentation - add missed import...
GitHub user jiazhai opened a pull request: https://github.com/apache/flink/pull/2038 [FLINK-3979] documentation - add missed import classes in run_example_quickstart The classes that need to be imported for this part of code ``` result .map(new MapFunction, String>() { @Override public String map(Tuple2 tuple) { return tuple.toString(); } }) .addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new SimpleStringSchema())); ``` is ``` import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.api.common.functions.MapFunction; ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/jiazhai/flink f3979 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2038.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2038 commit 84de7466757c86e968eb9ddce8ba7865821e9091 Author: Zhai Jia Date: 2016-05-26T15:24:40Z [FLINK-3979] add import for code --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3923) Unify configuration conventions of the Kinesis producer to the same as the consumer
[ https://issues.apache.org/jira/browse/FLINK-3923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302254#comment-15302254 ] ASF GitHub Bot commented on FLINK-3923: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2016#discussion_r64767022 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -160,12 +168,13 @@ public void setCustomPartitioner(KinesisPartitioner partitioner) { public void open(Configuration parameters) throws Exception { super.open(parameters); - KinesisProducerConfiguration config = new KinesisProducerConfiguration(); - config.setRegion(this.region); - config.setCredentialsProvider(new StaticCredentialsProvider(new BasicAWSCredentials(this.accessKey, this.secretKey))); + KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration(); + + producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION)); + producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); //config.setCollectionMaxCount(1); //config.setAggregationMaxCount(1); --- End diff -- I agree that using the KPL is in general a good idea. The problem is that the license of the KPL (the Amazon Software License) is not compatible with the Apache Software License, therefore, projects at Apache can not depend on such code. We will not release the Flink kinesis connector with the next Flink release for that reason (unless we somehow fix this issue). Are you working at Amazon? If so, can you contact me at robert (at) data-artisans.com ? > Unify configuration conventions of the Kinesis producer to the same as the > consumer > --- > > Key: FLINK-3923 > URL: https://issues.apache.org/jira/browse/FLINK-3923 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Robert Metzger >Assignee: Abdullah Ozturk > > Currently, the Kinesis consumer and producer are configured differently. > The producer expects a list of arguments for the access key, secret, region, > stream. The consumer is accepting properties (similar to the Kafka connector). > The objective of this issue is to change the producer so that it is also > using a properties-based configuration (including an input validation step) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2016#discussion_r64767022 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -160,12 +168,13 @@ public void setCustomPartitioner(KinesisPartitioner partitioner) { public void open(Configuration parameters) throws Exception { super.open(parameters); - KinesisProducerConfiguration config = new KinesisProducerConfiguration(); - config.setRegion(this.region); - config.setCredentialsProvider(new StaticCredentialsProvider(new BasicAWSCredentials(this.accessKey, this.secretKey))); + KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration(); + + producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION)); + producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); //config.setCollectionMaxCount(1); //config.setAggregationMaxCount(1); --- End diff -- I agree that using the KPL is in general a good idea. The problem is that the license of the KPL (the Amazon Software License) is not compatible with the Apache Software License, therefore, projects at Apache can not depend on such code. We will not release the Flink kinesis connector with the next Flink release for that reason (unless we somehow fix this issue). Are you working at Amazon? If so, can you contact me at robert (at) data-artisans.com ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1981#issuecomment-221903295 Ah, sorry. No, I didn't see your comment. Will think about that and reply later. Thanks! :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-3936) Add MIN / MAX aggregations function for BOOLEAN types
[ https://issues.apache.org/jira/browse/FLINK-3936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-3936. Resolution: Implemented Implemented with 5784f395536d8a5e6f7d8bfd28bd9c8c0ed99b18 > Add MIN / MAX aggregations function for BOOLEAN types > - > > Key: FLINK-3936 > URL: https://issues.apache.org/jira/browse/FLINK-3936 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Critical > Fix For: 1.1.0 > > > When executing TPC-H Q4, I observed that Calcite generates a MIN aggregate on > Boolean literals to translate a decorrelate subquery in an {{EXIST}} > predicate. > MIN and MAX aggregates on Boolean data types are currently not supported and > should be added. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-3696) Some Union tests fail for TableConfigMode.EFFICIENT
[ https://issues.apache.org/jira/browse/FLINK-3696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-3696. Resolution: Fixed Assignee: Yijie Shen Fix Version/s: 1.1.0 Fixed with ef5832d8f5867826a60f87e2fcaef912dc2950f6 > Some Union tests fail for TableConfigMode.EFFICIENT > --- > > Key: FLINK-3696 > URL: https://issues.apache.org/jira/browse/FLINK-3696 > Project: Flink > Issue Type: Bug > Components: Table API >Affects Versions: 1.1.0 >Reporter: Vasia Kalavri >Assignee: Yijie Shen > Fix For: 1.1.0 > > > e.g. testUnionWithFilter gives the following exception: > {code} > org.apache.flink.api.common.InvalidProgramException: Cannot union inputs of > different types. Input1=scala.Tuple3(_1: Integer, _2: Long, _3: String), > input2=Java Tuple3> at > org.apache.flink.api.java.operators.UnionOperator.(UnionOperator.java:47) > at org.apache.flink.api.java.DataSet.union(DataSet.java:1208) > at > org.apache.flink.api.table.plan.nodes.dataset.DataSetUnion.translateToPlan(DataSetUnion.scala:81) > at > org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:95) > at > org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:91) > at > org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:51) > at > org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:43) > at > org.apache.flink.api.scala.table.test.UnionITCase.testUnionWithFilter(UnionITCase.scala:77) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) > at org.junit.runners.ParentRunner.run(ParentRunner.java:309) > at org.junit.runners.Suite.runChild(Suite.java:127) > at org.junit.runners.Suite.runChild(Suite.java:26) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.run(ParentRunner.java:309) > at org.junit.runners.Suite.runChild(Suite.java:127) > at org.junit.runners.Suite.runChild(Suite.java:26) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) > at org.junit.runners.ParentRunner.run(ParentRunner.java:309) > at org.junit.runner.JUnitCore.run(JUnitCore.java:160) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68) >
[jira] [Closed] (FLINK-3941) Add support for UNION (with duplicate elimination)
[ https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-3941. Resolution: Implemented Fix Version/s: 1.1.0 Implemented with ef5832d8f5867826a60f87e2fcaef912dc2950f6 > Add support for UNION (with duplicate elimination) > -- > > Key: FLINK-3941 > URL: https://issues.apache.org/jira/browse/FLINK-3941 > Project: Flink > Issue Type: New Feature > Components: Table API >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Assignee: Yijie Shen >Priority: Minor > Fix For: 1.1.0 > > > Currently, only UNION ALL is supported by Table API and SQL. > UNION (with duplicate elimination) can be supported by applying a > {{DataSet.distinct()}} after the union on all fields. This issue includes: > - Extending {{DataSetUnion}} > - Relaxing {{DataSetUnionRule}} to translated non-all unions. > - Extend the Table API with union() method. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2025 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3936) Add MIN / MAX aggregations function for BOOLEAN types
[ https://issues.apache.org/jira/browse/FLINK-3936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302217#comment-15302217 ] ASF GitHub Bot commented on FLINK-3936: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2035 > Add MIN / MAX aggregations function for BOOLEAN types > - > > Key: FLINK-3936 > URL: https://issues.apache.org/jira/browse/FLINK-3936 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Critical > Fix For: 1.1.0 > > > When executing TPC-H Q4, I observed that Calcite generates a MIN aggregate on > Boolean literals to translate a decorrelate subquery in an {{EXIST}} > predicate. > MIN and MAX aggregates on Boolean data types are currently not supported and > should be added. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...
Github user dawidwys commented on the pull request: https://github.com/apache/flink/pull/1981#issuecomment-221901395 I am not sure if you've seen my comment(due it was to some previous commit version). Let me paste my doubts about CNF one more time: > I am not sure if the normalization to CNF is necessary. Calcite does not do it, but searches just the AND operators subtrees for equi conditions. E.g. For condition (l.a = r.b AND l.c =r.d) OR (l.a = r.b AND l.e = r.f) after transformation to CNF it will be possible to find equi condition, but Calcite is not finding it (see RelOptUtil#splitJoinCondition) and results in failing DataSet join. > > My proposition would be: > > traverse whole tree(both OR and AND branches) checking if only join-conditions exists > check if equi-condition exists in AND branch --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3941) Add support for UNION (with duplicate elimination)
[ https://issues.apache.org/jira/browse/FLINK-3941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302218#comment-15302218 ] ASF GitHub Bot commented on FLINK-3941: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2025 > Add support for UNION (with duplicate elimination) > -- > > Key: FLINK-3941 > URL: https://issues.apache.org/jira/browse/FLINK-3941 > Project: Flink > Issue Type: New Feature > Components: Table API >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Assignee: Yijie Shen >Priority: Minor > > Currently, only UNION ALL is supported by Table API and SQL. > UNION (with duplicate elimination) can be supported by applying a > {{DataSet.distinct()}} after the union on all fields. This issue includes: > - Extending {{DataSetUnion}} > - Relaxing {{DataSetUnionRule}} to translated non-all unions. > - Extend the Table API with union() method. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3936] [tableAPI] Add MIN/MAX aggregatio...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2035 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3979) [documentation]add missed import classes in run_example_quickstart
Jia Zhai created FLINK-3979: --- Summary: [documentation]add missed import classes in run_example_quickstart Key: FLINK-3979 URL: https://issues.apache.org/jira/browse/FLINK-3979 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Jia Zhai The classes that need to be imported for this part of code {code} result .map(new MapFunction, String>() { @Override public String map(Tuple2 tuple) { return tuple.toString(); } }) .addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new SimpleStringSchema())); {code} is {code} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.api.common.functions.MapFunction; {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/1981#discussion_r64763844 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala --- @@ -342,21 +368,29 @@ case class Join( } private def testJoinCondition(expression: Expression): Unit = { -def checkIfJoinCondition(exp : Expression) = if (exp.children.exists(!_.isInstanceOf[JoinFieldReference])) { - failValidation(s"Only join predicates supported. For non-join predicates use Table#where.") -} +def checkIfJoinCondition(exp : BinaryComparison) = + if (exp.children match { +case (x : JoinFieldReference) :: (y : JoinFieldReference) :: Nil => + x.belongsToLeft == y.belongsToLeft +case _ => true + }) { +failValidation(s"Only join predicates supported. For non-join predicates use Table#where.") + } var equiJoinFound = false -def validateConditions(exp: Expression) : Unit = exp match { - case x: And => x.children.foreach(validateConditions(_)) +def validateConditions(exp: Expression, isAndBranch: Boolean): Unit = exp match { --- End diff -- You're absolutely right, my mistake, sorry! I fixed it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1981#issuecomment-221899756 Hi @dawidwys, the equi join check is a bit more involved. I implemented a CNF converter based on the algorithm described here: http://cs.jhu.edu/~jason/tutorials/convert-to-CNF (see my [branch](https://github.com/fhueske/flink/tree/tableOuter) ). You can try to do implement the algorithm as well. Alternatively, we can also use my version. Another thing we need to fix is the documentation, i.e., extending the Table API description and removing the outer join restriction in the SQL section in `docs/apis/table.md`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1981#discussion_r64762144 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala --- @@ -342,21 +368,29 @@ case class Join( } private def testJoinCondition(expression: Expression): Unit = { -def checkIfJoinCondition(exp : Expression) = if (exp.children.exists(!_.isInstanceOf[JoinFieldReference])) { - failValidation(s"Only join predicates supported. For non-join predicates use Table#where.") -} +def checkIfJoinCondition(exp : BinaryComparison) = + if (exp.children match { +case (x : JoinFieldReference) :: (y : JoinFieldReference) :: Nil => + x.belongsToLeft == y.belongsToLeft +case _ => true + }) { +failValidation(s"Only join predicates supported. For non-join predicates use Table#where.") + } var equiJoinFound = false -def validateConditions(exp: Expression) : Unit = exp match { - case x: And => x.children.foreach(validateConditions(_)) +def validateConditions(exp: Expression, isAndBranch: Boolean): Unit = exp match { --- End diff -- I'm sorry but this check does not catch all cases. For example, it would let `l.a = r.b OR (l.c = r.d && l.e = r.f)` pass. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3717 - Be able to re-start reading from ...
Github user kl0u closed the pull request at: https://github.com/apache/flink/pull/1895 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3889) Make File Monitoring Function checkpointable.
[ https://issues.apache.org/jira/browse/FLINK-3889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302184#comment-15302184 ] ASF GitHub Bot commented on FLINK-3889: --- Github user kl0u closed the pull request at: https://github.com/apache/flink/pull/1984 > Make File Monitoring Function checkpointable. > - > > Key: FLINK-3889 > URL: https://issues.apache.org/jira/browse/FLINK-3889 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > > This is essentially the combination of FLINK-3808 and FLINK-3717. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3717) Add functionality to be a able to restore from specific point in a FileInputFormat
[ https://issues.apache.org/jira/browse/FLINK-3717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302185#comment-15302185 ] ASF GitHub Bot commented on FLINK-3717: --- Github user kl0u closed the pull request at: https://github.com/apache/flink/pull/1895 > Add functionality to be a able to restore from specific point in a > FileInputFormat > -- > > Key: FLINK-3717 > URL: https://issues.apache.org/jira/browse/FLINK-3717 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > > This is the first step in order to make the File Sources fault-tolerant. We > have to be able to get the start from a specific point in a file despite any > caching performed during reading. This will guarantee that the task that will > take over the execution of the failed one will be able to start from the > correct point in the file. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...
Github user kl0u closed the pull request at: https://github.com/apache/flink/pull/1984 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3655) Allow comma-separated or multiple directories to be specified for FileInputFormat
[ https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302164#comment-15302164 ] ASF GitHub Bot commented on FLINK-3655: --- Github user gna-phetsarath commented on the pull request: https://github.com/apache/flink/pull/1990#issuecomment-221890183 You are correct, the majority of the changes were in the "generate splits" method and "statistics" methods which included changes to subclasses that used the file path directly. Not as extensive as it appears. Also, additional tests were added. > Allow comma-separated or multiple directories to be specified for > FileInputFormat > - > > Key: FLINK-3655 > URL: https://issues.apache.org/jira/browse/FLINK-3655 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.0 >Reporter: Gna Phetsarath >Priority: Minor > Labels: starter > > Allow comma-separated or multiple directories to be specified for > FileInputFormat so that a DataSource will process the directories > sequentially. > > env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*") > in Scala >env.readFile(paths: Seq[String]) > or > env.readFile(path: String, otherPaths: String*) > Wildcard support would be a bonus. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3923) Unify configuration conventions of the Kinesis producer to the same as the consumer
[ https://issues.apache.org/jira/browse/FLINK-3923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302170#comment-15302170 ] ASF GitHub Bot commented on FLINK-3923: --- Github user aozturk commented on a diff in the pull request: https://github.com/apache/flink/pull/2016#discussion_r64757778 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -160,12 +168,13 @@ public void setCustomPartitioner(KinesisPartitioner partitioner) { public void open(Configuration parameters) throws Exception { super.open(parameters); - KinesisProducerConfiguration config = new KinesisProducerConfiguration(); - config.setRegion(this.region); - config.setCredentialsProvider(new StaticCredentialsProvider(new BasicAWSCredentials(this.accessKey, this.secretKey))); + KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration(); + + producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION)); + producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); //config.setCollectionMaxCount(1); //config.setAggregationMaxCount(1); --- End diff -- Ah sorry I have returned back to the code to be sure and it seems KPL is already used. > Unify configuration conventions of the Kinesis producer to the same as the > consumer > --- > > Key: FLINK-3923 > URL: https://issues.apache.org/jira/browse/FLINK-3923 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Robert Metzger >Assignee: Abdullah Ozturk > > Currently, the Kinesis consumer and producer are configured differently. > The producer expects a list of arguments for the access key, secret, region, > stream. The consumer is accepting properties (similar to the Kafka connector). > The objective of this issue is to change the producer so that it is also > using a properties-based configuration (including an input validation step) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user aozturk commented on a diff in the pull request: https://github.com/apache/flink/pull/2016#discussion_r64757778 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -160,12 +168,13 @@ public void setCustomPartitioner(KinesisPartitioner partitioner) { public void open(Configuration parameters) throws Exception { super.open(parameters); - KinesisProducerConfiguration config = new KinesisProducerConfiguration(); - config.setRegion(this.region); - config.setCredentialsProvider(new StaticCredentialsProvider(new BasicAWSCredentials(this.accessKey, this.secretKey))); + KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration(); + + producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION)); + producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); //config.setCollectionMaxCount(1); //config.setAggregationMaxCount(1); --- End diff -- Ah sorry I have returned back to the code to be sure and it seems KPL is already used. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3655] Multiple File Paths for InputFile...
Github user gna-phetsarath commented on the pull request: https://github.com/apache/flink/pull/1990#issuecomment-221890183 You are correct, the majority of the changes were in the "generate splits" method and "statistics" methods which included changes to subclasses that used the file path directly. Not as extensive as it appears. Also, additional tests were added. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3923) Unify configuration conventions of the Kinesis producer to the same as the consumer
[ https://issues.apache.org/jira/browse/FLINK-3923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302154#comment-15302154 ] ASF GitHub Bot commented on FLINK-3923: --- Github user aozturk commented on a diff in the pull request: https://github.com/apache/flink/pull/2016#discussion_r64756634 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -160,12 +168,13 @@ public void setCustomPartitioner(KinesisPartitioner partitioner) { public void open(Configuration parameters) throws Exception { super.open(parameters); - KinesisProducerConfiguration config = new KinesisProducerConfiguration(); - config.setRegion(this.region); - config.setCredentialsProvider(new StaticCredentialsProvider(new BasicAWSCredentials(this.accessKey, this.secretKey))); + KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration(); + + producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION)); + producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); //config.setCollectionMaxCount(1); //config.setAggregationMaxCount(1); --- End diff -- Sorry again for my late response. Meanwhile I have thought about this and I have come up with a question. I had noticed a remark about not using the KCL library, but is there any special reason why we are not using KPL? Anything we intend to do will probably replicate it including retry mechanism on failures, multi-threading and asynchronous writes. We also need to take into consideration that our producer solution should be cost effective by using the opportunities for batching, which can be customized by the user if low latency is preferred. > Unify configuration conventions of the Kinesis producer to the same as the > consumer > --- > > Key: FLINK-3923 > URL: https://issues.apache.org/jira/browse/FLINK-3923 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Robert Metzger >Assignee: Abdullah Ozturk > > Currently, the Kinesis consumer and producer are configured differently. > The producer expects a list of arguments for the access key, secret, region, > stream. The consumer is accepting properties (similar to the Kafka connector). > The objective of this issue is to change the producer so that it is also > using a properties-based configuration (including an input validation step) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user aozturk commented on a diff in the pull request: https://github.com/apache/flink/pull/2016#discussion_r64756634 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -160,12 +168,13 @@ public void setCustomPartitioner(KinesisPartitioner partitioner) { public void open(Configuration parameters) throws Exception { super.open(parameters); - KinesisProducerConfiguration config = new KinesisProducerConfiguration(); - config.setRegion(this.region); - config.setCredentialsProvider(new StaticCredentialsProvider(new BasicAWSCredentials(this.accessKey, this.secretKey))); + KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration(); + + producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION)); + producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); //config.setCollectionMaxCount(1); //config.setAggregationMaxCount(1); --- End diff -- Sorry again for my late response. Meanwhile I have thought about this and I have come up with a question. I had noticed a remark about not using the KCL library, but is there any special reason why we are not using KPL? Anything we intend to do will probably replicate it including retry mechanism on failures, multi-threading and asynchronous writes. We also need to take into consideration that our producer solution should be cost effective by using the opportunities for batching, which can be customized by the user if low latency is preferred. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302127#comment-15302127 ] Stephan Ewen commented on FLINK-3974: - Yes, that is a pretty clear bug. I guess the best workaround for now is to disable the object reuse mode. Object reuse does not really work well in the DataStream API streaming right now, it works pretty well in the DataSet API. Another quick workaround is to not chain the two different map functions {{.disableChaining()}}. The solution should be quite straightforward, though: - Not chain and "splitting" flows any more. I would actually like that solution. For splitting flows, it seems like a good heuristic to start a new chain/thread by default. - Each collector should use its own dedicated stream record. That would circumvent the ClassCast at least, but still be dangerous if the mappers actually alter the events. > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt > Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > ​.addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output.collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output .collect}} call passes the > {{StreamRecord}} to the second map operation it is actually a > {{StreamRecord}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3967) Provide RethinkDB Sink for Flink
[ https://issues.apache.org/jira/browse/FLINK-3967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302143#comment-15302143 ] ASF GitHub Bot commented on FLINK-3967: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/2031#issuecomment-221886697 Hi @mans2singh, thanks a lot for this contribution. I wonder what's your motivation to implement a flink --> rethinkdb connector? I'm asking because the Flink community can not accept every contribution, in particular if its adding a large new component we need to maintain. Only if there are enough users and developers willing support a feature, we can accept the contribution. I'm a bit sorry to tell you this after you've implemented the code, on the other hand, our guidelines [1] clearly state that new features need to be discussed with the community first. The good news is that we have a special section for community maintained modules on our website [2]. Therefore, I suggest that you put the code into a separate github repository and open a pull request to add a link to that repository. The good thing about this approach is also, that we don't have to fix the license issue immediately. Also, it doesn't mean that I'm against this contribution at all. If we see many users using the module from your github repository, and there are people in the community willing to maintain it, we can also look into merging it. [1] http://flink.apache.org/contribute-code.html#before-you-start-coding [2] http://flink.apache.org/community.html#third-party-packages > Provide RethinkDB Sink for Flink > > > Key: FLINK-3967 > URL: https://issues.apache.org/jira/browse/FLINK-3967 > Project: Flink > Issue Type: New Feature > Components: Streaming, Streaming Connectors >Affects Versions: 1.0.3 > Environment: All >Reporter: Mans Singh >Assignee: Mans Singh >Priority: Minor > Labels: features > Fix For: 1.1.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > Provide Sink to stream data from flink to rethink db. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-3967 - Flink Sink for Rethink Db
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/2031#issuecomment-221886697 Hi @mans2singh, thanks a lot for this contribution. I wonder what's your motivation to implement a flink --> rethinkdb connector? I'm asking because the Flink community can not accept every contribution, in particular if its adding a large new component we need to maintain. Only if there are enough users and developers willing support a feature, we can accept the contribution. I'm a bit sorry to tell you this after you've implemented the code, on the other hand, our guidelines [1] clearly state that new features need to be discussed with the community first. The good news is that we have a special section for community maintained modules on our website [2]. Therefore, I suggest that you put the code into a separate github repository and open a pull request to add a link to that repository. The good thing about this approach is also, that we don't have to fix the license issue immediately. Also, it doesn't mean that I'm against this contribution at all. If we see many users using the module from your github repository, and there are people in the community willing to maintain it, we can also look into merging it. [1] http://flink.apache.org/contribute-code.html#before-you-start-coding [2] http://flink.apache.org/community.html#third-party-packages --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3923) Unify configuration conventions of the Kinesis producer to the same as the consumer
[ https://issues.apache.org/jira/browse/FLINK-3923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302141#comment-15302141 ] ASF GitHub Bot commented on FLINK-3923: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/2016#issuecomment-221885858 Sure, no problem at all. Its just for my own time allocation. Thanks for the quick response. > Unify configuration conventions of the Kinesis producer to the same as the > consumer > --- > > Key: FLINK-3923 > URL: https://issues.apache.org/jira/browse/FLINK-3923 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Robert Metzger >Assignee: Abdullah Ozturk > > Currently, the Kinesis consumer and producer are configured differently. > The producer expects a list of arguments for the access key, secret, region, > stream. The consumer is accepting properties (similar to the Kafka connector). > The objective of this issue is to change the producer so that it is also > using a properties-based configuration (including an input validation step) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/2016#issuecomment-221885858 Sure, no problem at all. Its just for my own time allocation. Thanks for the quick response. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-3978) Add contains methods to RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-3978: -- Description: The javadocs for `RuntimeContext` state that `getAccumulator` "throws an exception if the accumulator does not exist or if the accumulator exists, but with different type", although `AbstractRuntimeUDFContext` does not throw an exception but will return null. The javadocs for `getBroadcastVariable` do not mention throwing an exception. Currently the only way to handle a broadcast variable that that may or may not exist is to catch and ignore the exception. Adding a `containsBroadcastVariable` method to `RuntimeContext` would make this explicit. was: The javadocs for `RuntimeContext` state that `getAccumulator` "throws an exception if the accumulator does not exist or if the accumulator exists, but with different type", although `AbstractRuntimeUDFContext` does not throw an exception but will return null. The javadocs for `getBroadcastVariable` do not mention throwing an exception. Currently the only way to handle a broadcast variable that that may or may not exist is to catch and ignore the exception. Adding a `containsBroadcastVariable` method to `RuntimeContext` would make this explicit. Likewise, `containsAccumulator`. > Add contains methods to RuntimeContext > -- > > Key: FLINK-3978 > URL: https://issues.apache.org/jira/browse/FLINK-3978 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > The javadocs for `RuntimeContext` state that `getAccumulator` "throws an > exception if the accumulator does not exist or if the accumulator exists, but > with different type", although `AbstractRuntimeUDFContext` does not throw an > exception but will return null. > The javadocs for `getBroadcastVariable` do not mention throwing an exception. > Currently the only way to handle a broadcast variable that that may or may > not exist is to catch and ignore the exception. Adding a > `containsBroadcastVariable` method to `RuntimeContext` would make this > explicit. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3923) Unify configuration conventions of the Kinesis producer to the same as the consumer
[ https://issues.apache.org/jira/browse/FLINK-3923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302139#comment-15302139 ] ASF GitHub Bot commented on FLINK-3923: --- Github user aozturk commented on the pull request: https://github.com/apache/flink/pull/2016#issuecomment-221885430 Sorry I have had an extra duty for this week, but I have already started working on it and hope to wrap this evening. > Unify configuration conventions of the Kinesis producer to the same as the > consumer > --- > > Key: FLINK-3923 > URL: https://issues.apache.org/jira/browse/FLINK-3923 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Robert Metzger >Assignee: Abdullah Ozturk > > Currently, the Kinesis consumer and producer are configured differently. > The producer expects a list of arguments for the access key, secret, region, > stream. The consumer is accepting properties (similar to the Kafka connector). > The objective of this issue is to change the producer so that it is also > using a properties-based configuration (including an input validation step) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user aozturk commented on the pull request: https://github.com/apache/flink/pull/2016#issuecomment-221885430 Sorry I have had an extra duty for this week, but I have already started working on it and hope to wrap this evening. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-3978) Add contains methods to RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-3978: -- Description: The javadocs for {{RuntimeContext}} state that {{getAccumulator}} "throws an exception if the accumulator does not exist or if the accumulator exists, but with different type", although {{AbstractRuntimeUDFContext}} does not throw an exception but will return null. The javadocs for {{getBroadcastVariable}} do not mention throwing an exception. Currently the only way to handle a broadcast variable that that may or may not exist is to catch and ignore the exception. Adding a {{containsBroadcastVariable}} method to {{RuntimeContext}} would make this explicit. was: The javadocs for `RuntimeContext` state that `getAccumulator` "throws an exception if the accumulator does not exist or if the accumulator exists, but with different type", although `AbstractRuntimeUDFContext` does not throw an exception but will return null. The javadocs for `getBroadcastVariable` do not mention throwing an exception. Currently the only way to handle a broadcast variable that that may or may not exist is to catch and ignore the exception. Adding a `containsBroadcastVariable` method to `RuntimeContext` would make this explicit. > Add contains methods to RuntimeContext > -- > > Key: FLINK-3978 > URL: https://issues.apache.org/jira/browse/FLINK-3978 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > The javadocs for {{RuntimeContext}} state that {{getAccumulator}} "throws an > exception if the accumulator does not exist or if the accumulator exists, but > with different type", although {{AbstractRuntimeUDFContext}} does not throw > an exception but will return null. > The javadocs for {{getBroadcastVariable}} do not mention throwing an > exception. Currently the only way to handle a broadcast variable that that > may or may not exist is to catch and ignore the exception. Adding a > {{containsBroadcastVariable}} method to {{RuntimeContext}} would make this > explicit. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3978) Add contains methods to RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302133#comment-15302133 ] Greg Hogan commented on FLINK-3978: --- Thanks! I removed that bit from the description. Only looking at {{containsBroadcastVariable}} now. > Add contains methods to RuntimeContext > -- > > Key: FLINK-3978 > URL: https://issues.apache.org/jira/browse/FLINK-3978 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > The javadocs for `RuntimeContext` state that `getAccumulator` "throws an > exception if the accumulator does not exist or if the accumulator exists, but > with different type", although `AbstractRuntimeUDFContext` does not throw an > exception but will return null. > The javadocs for `getBroadcastVariable` do not mention throwing an exception. > Currently the only way to handle a broadcast variable that that may or may > not exist is to catch and ignore the exception. Adding a > `containsBroadcastVariable` method to `RuntimeContext` would make this > explicit. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3923) Unify configuration conventions of the Kinesis producer to the same as the consumer
[ https://issues.apache.org/jira/browse/FLINK-3923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302123#comment-15302123 ] ASF GitHub Bot commented on FLINK-3923: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/2016#issuecomment-221882616 Just for my planning, when do you think you'll find time to address the remaining issues? > Unify configuration conventions of the Kinesis producer to the same as the > consumer > --- > > Key: FLINK-3923 > URL: https://issues.apache.org/jira/browse/FLINK-3923 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Robert Metzger >Assignee: Abdullah Ozturk > > Currently, the Kinesis consumer and producer are configured differently. > The producer expects a list of arguments for the access key, secret, region, > stream. The consumer is accepting properties (similar to the Kafka connector). > The objective of this issue is to change the producer so that it is also > using a properties-based configuration (including an input validation step) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/2016#issuecomment-221882616 Just for my planning, when do you think you'll find time to address the remaining issues? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3970) How to deal with "resouce isolation" problem
[ https://issues.apache.org/jira/browse/FLINK-3970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302120#comment-15302120 ] Stephan Ewen commented on FLINK-3970: - Since this is a question, rather than a pending issue, are you okay with closing this issue? > How to deal with "resouce isolation" problem > > > Key: FLINK-3970 > URL: https://issues.apache.org/jira/browse/FLINK-3970 > Project: Flink > Issue Type: Wish >Reporter: ZhengBowen > > For example, 'big query' and 'small query' are executed at the same time, you > need isolate 'big query' and 'small query' to prevent 'big query' exhaust > resouce(including i/o,mem,network) to make the 'small query' can complete > quickly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2765) Upgrade hbase version for hadoop-2 to 1.2 release
[ https://issues.apache.org/jira/browse/FLINK-2765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302121#comment-15302121 ] Ted Yu commented on FLINK-2765: --- API compatibility is maintained. However, HBASE-15198 caused regression which is being worked on. Stay tuned for the next release with fix. > Upgrade hbase version for hadoop-2 to 1.2 release > - > > Key: FLINK-2765 > URL: https://issues.apache.org/jira/browse/FLINK-2765 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Priority: Minor > > Currently 0.98.11 is used: > {code} > 0.98.11-hadoop2 > {code} > Stable release for hadoop-2 is 1.1.x line > We should upgrade to 1.2 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2765) Upgrade hbase version for hadoop-2 to 1.2 release
[ https://issues.apache.org/jira/browse/FLINK-2765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302112#comment-15302112 ] Stephan Ewen commented on FLINK-2765: - Did HBase break any APIs, or is the upgrade trivially possible? > Upgrade hbase version for hadoop-2 to 1.2 release > - > > Key: FLINK-2765 > URL: https://issues.apache.org/jira/browse/FLINK-2765 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Priority: Minor > > Currently 0.98.11 is used: > {code} > 0.98.11-hadoop2 > {code} > Stable release for hadoop-2 is 1.1.x line > We should upgrade to 1.2 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3908) FieldParsers error state is not reset correctly to NONE
[ https://issues.apache.org/jira/browse/FLINK-3908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302107#comment-15302107 ] ASF GitHub Bot commented on FLINK-3908: --- Github user fpompermaier commented on the pull request: https://github.com/apache/flink/pull/2007#issuecomment-221880071 I don't understand :( Assuming that `reset` could be renamed as `resetErrorStateAndParse`, the other 2 suggestions cannot be applied to my current implementation: if I want to rename `parseFieldImpl` to `parseField` I should overload somehow the method otherwise that's not possible because there's already an abstract method called parseField with the same sign.. Wrt leaving the responsibility to reset the state of the parsers to the classes calling the parseField is quite dangerous IMHo (as you said in "GenericCsvInputCormat would call the resetErrorStateAndParse"). Am I misunderstanding something? > FieldParsers error state is not reset correctly to NONE > --- > > Key: FLINK-3908 > URL: https://issues.apache.org/jira/browse/FLINK-3908 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.0.2 >Reporter: Flavio Pompermaier >Assignee: Flavio Pompermaier > Labels: parser > > If during the parse of a csv there's a parse error (for example when in a > integer column there are non-int values) the errorState is not reset > correctly in the next parseField call. A simple fix would be to add as a > first statement of the {{parseField()}} function a call to > {{setErrorState(ParseErrorState.NONE)}} but it is something that should be > handled better (by default) for every subclass of {{FieldParser}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3908] Fixed Parser's error state reset
Github user fpompermaier commented on the pull request: https://github.com/apache/flink/pull/2007#issuecomment-221880071 I don't understand :( Assuming that `reset` could be renamed as `resetErrorStateAndParse`, the other 2 suggestions cannot be applied to my current implementation: if I want to rename `parseFieldImpl` to `parseField` I should overload somehow the method otherwise that's not possible because there's already an abstract method called parseField with the same sign.. Wrt leaving the responsibility to reset the state of the parsers to the classes calling the parseField is quite dangerous IMHo (as you said in "GenericCsvInputCormat would call the resetErrorStateAndParse"). Am I misunderstanding something? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3978) Add contains methods to RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302101#comment-15302101 ] Aljoscha Krettek commented on FLINK-3978: - For the accumulators, I think the way to go is what is proposed as part of this: https://issues.apache.org/jira/browse/FLINK-3758, i.e. change the accumulator interface to allow getting an accumulator or adding a new one in one method call. > Add contains methods to RuntimeContext > -- > > Key: FLINK-3978 > URL: https://issues.apache.org/jira/browse/FLINK-3978 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > The javadocs for `RuntimeContext` state that `getAccumulator` "throws an > exception if the accumulator does not exist or if the accumulator exists, but > with different type", although `AbstractRuntimeUDFContext` does not throw an > exception but will return null. > The javadocs for `getBroadcastVariable` do not mention throwing an exception. > Currently the only way to handle a broadcast variable that that may or may > not exist is to catch and ignore the exception. Adding a > `containsBroadcastVariable` method to `RuntimeContext` would make this > explicit. Likewise, `containsAccumulator`. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3806) Revert use of DataSet.count() in Gelly
[ https://issues.apache.org/jira/browse/FLINK-3806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302095#comment-15302095 ] ASF GitHub Bot commented on FLINK-3806: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2036#discussion_r64749372 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java --- @@ -289,6 +300,11 @@ private GatherUdf(GatherFunctiongatherFunction, TypeInformation numberOfVertices = getRuntimeContext().getBroadcastVariable("number of vertices"); + this.gatherFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue()); + } catch (Exception e) { + } --- End diff -- I think that makes sense. > Revert use of DataSet.count() in Gelly > -- > > Key: FLINK-3806 > URL: https://issues.apache.org/jira/browse/FLINK-3806 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Critical > Fix For: 1.1.0 > > > FLINK-1632 replaced {{GraphUtils.count}} with {{DataSetUtils.count}}. The > former returns a {{DataSet}} while the latter executes a job to return a Java > value. > {{DataSetUtils.count}} is called from {{Graph.numberOfVertices}} and > {{Graph.numberOfEdges}} which are called from {{GatherSumApplyIteration}} and > {{ScatterGatherIteration}} as well as the {{PageRank}} algorithms when the > user does not pass the number of vertices as a parameter. > As noted in FLINK-1632, this does make the code simpler but if my > understanding is correct will materialize the Graph twice. The Graph will > need to be reread from input, regenerated, or recomputed by preceding > algorithms. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3806] [gelly] Revert use of DataSet.cou...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2036#discussion_r64749372 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java --- @@ -289,6 +300,11 @@ private GatherUdf(GatherFunctiongatherFunction, TypeInformation numberOfVertices = getRuntimeContext().getBroadcastVariable("number of vertices"); + this.gatherFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue()); + } catch (Exception e) { + } --- End diff -- I think that makes sense. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3908) FieldParsers error state is not reset correctly to NONE
[ https://issues.apache.org/jira/browse/FLINK-3908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302081#comment-15302081 ] ASF GitHub Bot commented on FLINK-3908: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/2007#issuecomment-221876052 I think I meant it a bit differently than I guess you understood it. It is basically the exact same implementation you have now, only with different method names. > FieldParsers error state is not reset correctly to NONE > --- > > Key: FLINK-3908 > URL: https://issues.apache.org/jira/browse/FLINK-3908 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.0.2 >Reporter: Flavio Pompermaier >Assignee: Flavio Pompermaier > Labels: parser > > If during the parse of a csv there's a parse error (for example when in a > integer column there are non-int values) the errorState is not reset > correctly in the next parseField call. A simple fix would be to add as a > first statement of the {{parseField()}} function a call to > {{setErrorState(ParseErrorState.NONE)}} but it is something that should be > handled better (by default) for every subclass of {{FieldParser}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3978) Add contains methods to RuntimeContext
Greg Hogan created FLINK-3978: - Summary: Add contains methods to RuntimeContext Key: FLINK-3978 URL: https://issues.apache.org/jira/browse/FLINK-3978 Project: Flink Issue Type: Improvement Components: Core Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor The javadocs for `RuntimeContext` state that `getAccumulator` "throws an exception if the accumulator does not exist or if the accumulator exists, but with different type", although `AbstractRuntimeUDFContext` does not throw an exception but will return null. The javadocs for `getBroadcastVariable` do not mention throwing an exception. Currently the only way to handle a broadcast variable that that may or may not exist is to catch and ignore the exception. Adding a `containsBroadcastVariable` method to `RuntimeContext` would make this explicit. Likewise, `containsAccumulator`. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3908] Fixed Parser's error state reset
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/2007#issuecomment-221876052 I think I meant it a bit differently than I guess you understood it. It is basically the exact same implementation you have now, only with different method names. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...
Github user dawidwys commented on the pull request: https://github.com/apache/flink/pull/1981#issuecomment-221875998 I applied your changes @fhueske . If you have any comments regarding the equi-join validation I will address/apply them in the evening. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3868) Specialized CopyableValue serializers and comparators
[ https://issues.apache.org/jira/browse/FLINK-3868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302075#comment-15302075 ] ASF GitHub Bot commented on FLINK-3868: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1983#issuecomment-221873359 Thanks, Greg, this looks good all in all! A few thinks we should do before merging this, in my opinion: - I think all classes should be annotated with `@Internal`, because they should not be used directly by users. - The `NullValueComparator` can be simplified to not really operate on the values at all. All NullValues are always the same. They are all equal, hash to a constant value, etc. - We were trying to get rid of the `Record` type - it was part of a very old legacy API. It is still in there because some people ended up using it, but I would like to add as little dependency to it as possible. Hence I'd suggest to drop the `RecordSerializer`. > Specialized CopyableValue serializers and comparators > - > > Key: FLINK-3868 > URL: https://issues.apache.org/jira/browse/FLINK-3868 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > This need was discussed on the mailing list [1] and will be obviated by code > generation for POJO serializers and comparators [2] (as I understand, i.e., > {{LongValue}} will now simply be treated as a POJO which happens to contain a > {{long}} and a specialized serializer and comparator will be generated). > In the meantime, and since {{ValueTypeInfo}} will need to be reworked to use > the new generators, I think it is worthwhile to add specialized serializers > and comparators for the {{CopyableValue}} types. > This will also provide another point of comparison for the performance of the > generated serializers and comparators. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html > [2] https://issues.apache.org/jira/browse/FLINK-3599 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3868] [core] Specialized CopyableValue ...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1983#issuecomment-221873359 Thanks, Greg, this looks good all in all! A few thinks we should do before merging this, in my opinion: - I think all classes should be annotated with `@Internal`, because they should not be used directly by users. - The `NullValueComparator` can be simplified to not really operate on the values at all. All NullValues are always the same. They are all equal, hash to a constant value, etc. - We were trying to get rid of the `Record` type - it was part of a very old legacy API. It is still in there because some people ended up using it, but I would like to add as little dependency to it as possible. Hence I'd suggest to drop the `RecordSerializer`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1981#discussion_r64742780 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala --- @@ -342,21 +368,29 @@ case class Join( } private def testJoinCondition(expression: Expression): Unit = { -def checkIfJoinCondition(exp : Expression) = if (exp.children.exists(!_.isInstanceOf[JoinFieldReference])) { - failValidation(s"Only join predicates supported. For non-join predicates use Table#where.") -} +def checkIfJoinCondition(exp : BinaryComparison) = + if (exp.children match { --- End diff -- The condition looks good but can be a bit simplified like this: ``` exp.children match { case (x: JoinFieldReference) :: (y: JoinFieldReference) :: Nil if x.isFromLeftInput != y.isFromLeftInput => // predicate references both inputs. All good! case _ => failValidation( s"Only join predicates supported. For non-join predicates use Table#where.") } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3655) Allow comma-separated or multiple directories to be specified for FileInputFormat
[ https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302050#comment-15302050 ] ASF GitHub Bot commented on FLINK-3655: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1990#issuecomment-221866397 Thanks for opening that contribution. Can you sum up the changes you made? That would make the review easier. The changes look quite extensive. My gut feeling would be that it should not require so many changes, ideally only an additional loop in the "generate splits" method, and possibly in the "statistics" method. > Allow comma-separated or multiple directories to be specified for > FileInputFormat > - > > Key: FLINK-3655 > URL: https://issues.apache.org/jira/browse/FLINK-3655 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.0 >Reporter: Gna Phetsarath >Priority: Minor > Labels: starter > > Allow comma-separated or multiple directories to be specified for > FileInputFormat so that a DataSource will process the directories > sequentially. > > env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*") > in Scala >env.readFile(paths: Seq[String]) > or > env.readFile(path: String, otherPaths: String*) > Wildcard support would be a bonus. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1981#discussion_r64741369 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala --- @@ -272,23 +272,46 @@ case class Join( left.output ++ right.output } + private object JoinFieldReference { + +def apply( + name: String, + resultType: TypeInformation[_], + left: LogicalNode, + right: LogicalNode): JoinFieldReference = { + + val joinInputField = left.output.zipWithIndex.find(_._1.name == name).map(_._2) + .orElse(right.output.zipWithIndex.find(_._1.name == name).map(x => left.output.length + x._2)) + .getOrElse( + throw new NoSuchElementException(s"""Could not find field: $name""")) + .asInstanceOf[Int] + + new JoinFieldReference(name, resultType, left, right, joinInputField) +} + + } + private case class JoinFieldReference( --- End diff -- I am sorry, I think my last comment was confusing. Actually, I misinterpreted the semantics of the `RelBuilder.field(int, int, String)` method. I thought it would automatically add the offset of the left input. But apparently it doesn't... How about we change the `JoinFieldReference` to this: ``` private case class JoinFieldReference( name: String, resultType: TypeInformation[_], left: LogicalNode, right: LogicalNode) extends Attribute { val isFromLeftInput = left.output.map(_.name).contains(name) val (indexInInput, indexInJoin) = if (isFromLeftInput) { val indexInLeft = left.output.map(_.name).indexOf(name) (indexInLeft, indexInLeft) } else { val indexInRight = right.output.map(_.name).indexOf(name) (indexInRight, indexInRight + left.output.length) } override def toString = s"'$name" override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { // look up type of field val fieldType = relBuilder.field(2, if (isFromLeftInput) 0 else 1, name).getType() // create a new RexInputRef with index offset new RexInputRef(indexInJoin, fieldType) } override def withName(newName: String): Attribute = { if (newName == name) { this } else { JoinFieldReference(newName, resultType, left, right) } } } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3922) Infinite recursion on TypeExtractor
[ https://issues.apache.org/jira/browse/FLINK-3922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302041#comment-15302041 ] ASF GitHub Bot commented on FLINK-3922: --- Github user fpompermaier commented on the pull request: https://github.com/apache/flink/pull/2011#issuecomment-221864540 From my tests it does but I think it is something that should have a unit test > Infinite recursion on TypeExtractor > --- > > Key: FLINK-3922 > URL: https://issues.apache.org/jira/browse/FLINK-3922 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.0.2 >Reporter: Flavio Pompermaier >Assignee: Timo Walther >Priority: Critical > > This program cause a StackOverflow (infinite recursion) in the TypeExtractor: > {code:title=TypeSerializerStackOverflowOnRecursivePojo.java|borderStyle=solid} > public class TypeSerializerStackOverflowOnRecursivePojo { > public static class RecursivePojoimplements Serializable { > private static final long serialVersionUID = 1L; > > private RecursivePojo parent; > public RecursivePojo(){} > public RecursivePojo(K k, V v) { > } > public RecursivePojo getParent() { > return parent; > } > public void setParent(RecursivePojo parent) { > this.parent = parent; > } > > } > public static class TypedTuple extends Tuple3 RecursivePojo >>{ > private static final long serialVersionUID = 1L; > } > > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(Arrays.asList(new RecursivePojo Map >("test",new HashMap ( > .map(t-> {TypedTuple ret = new TypedTuple();ret.setFields("1", > "1", t);return ret;}).returns(TypedTuple.class) > .print(); > } > > } > {code} > The thrown Exception is the following: > {code:title=Exception thrown} > Exception in thread "main" java.lang.StackOverflowError > at > sun.reflect.generics.parser.SignatureParser.parsePackageNameAndSimpleClassTypeSignature(SignatureParser.java:328) > at > sun.reflect.generics.parser.SignatureParser.parseClassTypeSignature(SignatureParser.java:310) > at > sun.reflect.generics.parser.SignatureParser.parseFieldTypeSignature(SignatureParser.java:289) > at > sun.reflect.generics.parser.SignatureParser.parseFieldTypeSignature(SignatureParser.java:283) > at > sun.reflect.generics.parser.SignatureParser.parseTypeSignature(SignatureParser.java:485) > at > sun.reflect.generics.parser.SignatureParser.parseReturnType(SignatureParser.java:627) > at > sun.reflect.generics.parser.SignatureParser.parseMethodTypeSignature(SignatureParser.java:577) > at > sun.reflect.generics.parser.SignatureParser.parseMethodSig(SignatureParser.java:171) > at > sun.reflect.generics.repository.ConstructorRepository.parse(ConstructorRepository.java:55) > at > sun.reflect.generics.repository.ConstructorRepository.parse(ConstructorRepository.java:43) > at > sun.reflect.generics.repository.AbstractRepository.(AbstractRepository.java:74) > at > sun.reflect.generics.repository.GenericDeclRepository.(GenericDeclRepository.java:49) > at > sun.reflect.generics.repository.ConstructorRepository.(ConstructorRepository.java:51) > at > sun.reflect.generics.repository.MethodRepository.(MethodRepository.java:46) > at > sun.reflect.generics.repository.MethodRepository.make(MethodRepository.java:59) > at java.lang.reflect.Method.getGenericInfo(Method.java:102) > at java.lang.reflect.Method.getGenericReturnType(Method.java:255) > at > org.apache.flink.api.java.typeutils.TypeExtractor.isValidPojoField(TypeExtractor.java:1610) > at > org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1671) > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732) > at > org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678) > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732) > at >