[GitHub] flink pull request: [FLINK-1979] Lossfunctions
Github user skavulya commented on the pull request: https://github.com/apache/flink/pull/656#issuecomment-218651924 thanks @thvasilo! I created the PR https://github.com/apache/flink/pull/1985 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and...
GitHub user skavulya opened a pull request: https://github.com/apache/flink/pull/1985 [FLINK-1979] Add logistic loss, hinge loss and regularization penalties Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed â¦zation You can merge this pull request into a Git repository by running: $ git pull https://github.com/skavulya/flink loss-functions2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1985.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1985 commit 014108ac9ea2e46ac021e9bf3824624d54357f74 Author: spkavulyDate: 2016-05-11T22:55:13Z Add logistic loss, hinge loss and regularization penalties for optimization --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3768] [gelly] Clustering Coefficient
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/1896#issuecomment-218637517 Thanks for the docs update. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/1984 [FLINK-3889] Make File Monitoring Function checkpointable. This pull request introduces the underlying functionality to make Streaming File Sources persistent. It does not yet change the API calls, as this will be done after agreeing on the current architecture and implementation. In addition, this PR includes a commit for FLINK-3896. This allows an operator to cancel its container task. The need for this functionality came during a discussion with @StephanEwen and @aljoscha and it is a separate commit. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink ft_files Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1984.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1984 commit 7deb92236cec47ddcfbb3abfa396fd9d15f770b7 Author: kl0uDate: 2016-05-10T16:56:58Z [FLINK-3896] Allow a StreamTask to be Externally Cancelled It adds a method failExternally() to the StreamTask, so that custom Operators can make their containing task fail when needed. commit c9682b7606451c4eecf6f2f6df9a498fb6d39577 Author: kl0u Date: 2016-04-10T14:56:42Z [FLINK-3717] Make FileInputFormat checkpointable This adds a new interface called CheckpointableInputFormat which describes input formats whose state is queryable, i.e. getCurrentChannelState() returns where the reader is in the underlying source, and they can resume reading from a user-specified position. This functionality is not yet leveraged by current readers. commit cbbfd8d7e6db0f8f114675b4047aecb94996e500 Author: kl0u Date: 2016-04-18T14:37:54Z [FLINK-3889][FLINK-3808] Refactor File Monitoring Source This is meant to replace the different file reading sources in Flink streaming. Now there is one monitoring source with DOP 1 monitoring a directory and assigning input split to downstream readers. In addition, it makes the new features added by FLINK-3717 and FLINK-3808 work together. Now we have a file monitoring source that is also fault tolerant and can guarantee exactly once semantics. This does not replace the old API calls. This will be done in a future commit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector
[ https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280969#comment-15280969 ] Tzu-Li (Gordon) Tai commented on FLINK-3211: [~rmetzger] Hi Robert, Can you tell me how you were testing the Kinesis connector in AWS EMR before? I'd like to try it myself too when I get back, also to learn the process :) Thanks. > Add AWS Kinesis streaming connector > --- > > Key: FLINK-3211 > URL: https://issues.apache.org/jira/browse/FLINK-3211 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Original Estimate: 336h > Remaining Estimate: 336h > > AWS Kinesis is a widely adopted message queue used by AWS users, much like a > cloud service version of Apache Kafka. Support for AWS Kinesis will be a > great addition to the handful of Flink's streaming connectors to external > systems and a great reach out to the AWS community. > AWS supports two different ways to consume Kinesis data: with the low-level > AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK > can be used to consume Kinesis data, including stream read beginning from a > specific offset (or "record sequence number" in Kinesis terminology). On the > other hand, AWS officially recommends using KCL, which offers a higher-level > of abstraction that also comes with checkpointing and failure recovery by > using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state > storage. > However, KCL is essentially a stream processing library that wraps all the > partition-to-task (or "shard" in Kinesis terminology) determination and > checkpointing to allow the user to focus only on streaming application logic. > This leads to the understanding that we can not use the KCL to implement the > Kinesis streaming connector if we are aiming for a deep integration of Flink > with Kinesis that provides exactly once guarantees (KCL promises only > at-least-once). Therefore, AWS SDK will be the way to go for the > implementation of this feature. > With the ability to read from specific offsets, and also the fact that > Kinesis and Kafka share a lot of similarities, the basic principles of the > implementation of Flink's Kinesis streaming connector will very much resemble > the Kafka connector. We can basically follow the outlines described in > [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector > implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka > differences is described as following: > 1. While the Kafka connector can support reading from multiple topics, I > currently don't think this is a good idea for Kinesis streams (a Kinesis > Stream is logically equivalent to a Kafka topic). Kinesis streams can exist > in different AWS regions, and each Kinesis stream under the same AWS user > account may have completely independent access settings with different > authorization keys. Overall, a Kinesis stream feels like a much more > consolidated resource compared to Kafka topics. It would be great to hear > more thoughts on this part. > 2. While Kafka has brokers that can hold multiple partitions, the only > partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast > to the Kafka connector having per broker connections where the connections > can handle multiple Kafka partitions, the Kinesis connector will only need to > have simple per shard connections. > 3. Kinesis itself does not support committing offsets back to Kinesis. If we > were to implement this feature like the Kafka connector with Kafka / ZK to > sync outside view of progress, we probably could use ZK or DynamoDB like the > way KCL works. More thoughts on this part will be very helpful too. > As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis > Producer Library) [5]. However, for higher code consistency with the proposed > Kinesis Consumer, I think it will be better to stick with the AWS SDK for the > implementation. The implementation should be straight forward, being almost > if not completely the same as the Kafka sink. > References: > [1] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html > [2] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html > [3] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html > [4] http://data-artisans.com/kafka-flink-a-practical-how-to/ > [5] > http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3715) Move Accumulating/Discarding from Trigger to WindowOperator
[ https://issues.apache.org/jira/browse/FLINK-3715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas updated FLINK-3715: -- Description: As mentioned in https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#heading=h.pyk1sn8f33q2 we should move the decision of whether to {{PURGE}} a window upon firing from the {{Trigger}} to the {{WindowOperator}}. This also requires to add API so that the user can specify whether windows should be purged upon trigger firing (discarding) or kept (accumulating). As mentioned in the above doc, the {{Trigger}} can react with 4 results right now: {{CONTINUE}}, {{FIRE}}, {{PURGE}}, {{FIRE_AND_PURGE}}. The behavior of a trigger is not apparent if not looking at the code of the trigger, this has confused a number of users. With the new regime, a {{Trigger}} can just decide whether to {{CONTINUE}} or {{FIRE}}. The setting of accumulating/discarding decides whether to purge the window or keep it. This depends on FLINK-3714 where we introduce an "allowed lateness" setting. Having a choice between accumulating and discarding only makes sense with an allowed lateness greater zero. Otherwise no late elements could ever arrive that would go into the kept windows. was: As mentioned in https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#heading=h.pyk1sn8f33q2 we should move the decision of whether to {{PURGE}} a window upon firing from the {{Trigger}} to the {{WindowOperato}}. This also requires to add API so that the user can specify whether windows should be purged upon trigger firing (discarding) or kept (accumulating). As mentioned in the above doc, the {{Trigger}} can react with 4 results right now: {{CONTINUE}}, {{FIRE}}, {{PURGE}}, {{FIRE_AND_PURGE}}. The behavior of a trigger is not apparent if not looking at the code of the trigger, this has confused a number of users. With the new regime, a {{Trigger}} can just decide whether to {{CONTINUE}} or {{FIRE}}. The setting of accumulating/discarding decides whether to purge the window or keep it. This depends on FLINK-3714 where we introduce an "allowed lateness" setting. Having a choice between accumulating and discarding only makes sense with an allowed lateness greater zero. Otherwise no late elements could ever arrive that would go into the kept windows. > Move Accumulating/Discarding from Trigger to WindowOperator > --- > > Key: FLINK-3715 > URL: https://issues.apache.org/jira/browse/FLINK-3715 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Aljoscha Krettek > > As mentioned in > https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#heading=h.pyk1sn8f33q2 > we should move the decision of whether to {{PURGE}} a window upon firing > from the {{Trigger}} to the {{WindowOperator}}. This also requires to add API > so that the user can specify whether windows should be purged upon trigger > firing (discarding) or kept (accumulating). > As mentioned in the above doc, the {{Trigger}} can react with 4 results right > now: {{CONTINUE}}, {{FIRE}}, {{PURGE}}, {{FIRE_AND_PURGE}}. The behavior of a > trigger is not apparent if not looking at the code of the trigger, this has > confused a number of users. With the new regime, a {{Trigger}} can just > decide whether to {{CONTINUE}} or {{FIRE}}. The setting of > accumulating/discarding decides whether to purge the window or keep it. > This depends on FLINK-3714 where we introduce an "allowed lateness" setting. > Having a choice between accumulating and discarding only makes sense with an > allowed lateness greater zero. Otherwise no late elements could ever arrive > that would go into the kept windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3879) Native implementation of HITS algorithm
[ https://issues.apache.org/jira/browse/FLINK-3879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280826#comment-15280826 ] Greg Hogan commented on FLINK-3879: --- The current expectation for FLINK-2044 is that the user will have some intuition on how many iterations to run. If there is a specific best value then it should be hard-coded in the program; otherwise, we should provide guidance on how to select the number of iterations. I do not see discussion of a convergence threshold in the HITS paper, but borrowing this aspect of PageRank seems appropriate. A threshold of 1E-9 requires that the average score is changing by less than one part in a billion. That seems easier to conceptualize than setting the number of iterations. I think a benchmark is always good for context when comparing implementations. When does Scatter-Gather outperform GSA? I see that some algorithms cannot be implemented using GSA, but that is clearly not the case for these algorithms. The only delta iteration I am familiar with is Stephan's implementation of PageRank. Are you thinking of a similar modification to HITS? It should be straightforward to implement this and compare techniques for HITS and PageRank. Convergence should not be affected, but vertex scores below a given threshold will stop propagating deltas. https://gist.github.com/StephanEwen/2b1a4c9812ac46abc8f0 > Native implementation of HITS algorithm > --- > > Key: FLINK-3879 > URL: https://issues.apache.org/jira/browse/FLINK-3879 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Hyperlink-Induced Topic Search (HITS, also "hubs and authorities") is > presented in [0] and described in [1]. > "[HITS] is a very popular and effective algorithm to rank documents based on > the link information among a set of documents. The algorithm presumes that a > good hub is a document that points to many others, and a good authority is a > document that many documents point to." > [https://pdfs.semanticscholar.org/a8d7/c7a4c53a9102c4239356f9072ec62ca5e62f.pdf] > This implementation differs from FLINK-2044 by providing for convergence, > outputting both hub and authority scores, and completing in half the number > of iterations. > [0] http://www.cs.cornell.edu/home/kleinber/auth.pdf > [1] https://en.wikipedia.org/wiki/HITS_algorithm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...
Github user gallenvara commented on the pull request: https://github.com/apache/flink/pull/1956#issuecomment-218543446 @greghogan thanks and relevant codes have been modified. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2044) Implementation of Gelly HITS Algorithm
[ https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280541#comment-15280541 ] ASF GitHub Bot commented on FLINK-2044: --- Github user gallenvara commented on the pull request: https://github.com/apache/flink/pull/1956#issuecomment-218543446 @greghogan thanks and relevant codes have been modified. :) > Implementation of Gelly HITS Algorithm > -- > > Key: FLINK-2044 > URL: https://issues.apache.org/jira/browse/FLINK-2044 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Ahamd Javid >Assignee: GaoLun >Priority: Minor > > Implementation of Hits Algorithm in Gelly API using Java. the feature branch > can be found here: (https://github.com/JavidMayar/flink/commits/HITS) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3868) Specialized CopyableValue serializers and comparators
[ https://issues.apache.org/jira/browse/FLINK-3868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280496#comment-15280496 ] ASF GitHub Bot commented on FLINK-3868: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/1983 [FLINK-3868] [core] Specialized CopyableValue serializers and comparators Update ValueTypeInfo to use specialized serializers and comparators, many of which were already present. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 3868_specialized_copyablevalue_serializers_and_comparators Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1983.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1983 commit c3359933b67f4488e1ed7cd4f2632ee21cdb548e Author: Greg HoganDate: 2016-05-04T20:56:16Z [FLINK-3868] [core] Specialized CopyableValue serializers and comparators Update ValueTypeInfo to use specialized serializers and comparators, many of which were already present. > Specialized CopyableValue serializers and comparators > - > > Key: FLINK-3868 > URL: https://issues.apache.org/jira/browse/FLINK-3868 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > This need was discussed on the mailing list [1] and will be obviated by code > generation for POJO serializers and comparators [2] (as I understand, i.e., > {{LongValue}} will now simply be treated as a POJO which happens to contain a > {{long}} and a specialized serializer and comparator will be generated). > In the meantime, and since {{ValueTypeInfo}} will need to be reworked to use > the new generators, I think it is worthwhile to add specialized serializers > and comparators for the {{CopyableValue}} types. > This will also provide another point of comparison for the performance of the > generated serializers and comparators. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html > [2] https://issues.apache.org/jira/browse/FLINK-3599 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3868] [core] Specialized CopyableValue ...
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/1983 [FLINK-3868] [core] Specialized CopyableValue serializers and comparators Update ValueTypeInfo to use specialized serializers and comparators, many of which were already present. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 3868_specialized_copyablevalue_serializers_and_comparators Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1983.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1983 commit c3359933b67f4488e1ed7cd4f2632ee21cdb548e Author: Greg HoganDate: 2016-05-04T20:56:16Z [FLINK-3868] [core] Specialized CopyableValue serializers and comparators Update ValueTypeInfo to use specialized serializers and comparators, many of which were already present. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-3868) Specialized CopyableValue serializers and comparators
[ https://issues.apache.org/jira/browse/FLINK-3868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-3868: -- Fix Version/s: 1.1.0 > Specialized CopyableValue serializers and comparators > - > > Key: FLINK-3868 > URL: https://issues.apache.org/jira/browse/FLINK-3868 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > This need was discussed on the mailing list [1] and will be obviated by code > generation for POJO serializers and comparators [2] (as I understand, i.e., > {{LongValue}} will now simply be treated as a POJO which happens to contain a > {{long}} and a specialized serializer and comparator will be generated). > In the meantime, and since {{ValueTypeInfo}} will need to be reworked to use > the new generators, I think it is worthwhile to add specialized serializers > and comparators for the {{CopyableValue}} types. > This will also provide another point of comparison for the performance of the > generated serializers and comparators. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html > [2] https://issues.apache.org/jira/browse/FLINK-3599 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...
Github user yjshen commented on the pull request: https://github.com/apache/flink/pull/1958#issuecomment-218511092 @fhueske @twalthr thanks for the review work! I've updated the code just now. Look forward to more feedbacks on this :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3836) Change Histogram to enable Long counters
[ https://issues.apache.org/jira/browse/FLINK-3836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280348#comment-15280348 ] ASF GitHub Bot commented on FLINK-3836: --- Github user mbode closed the pull request at: https://github.com/apache/flink/pull/1966 > Change Histogram to enable Long counters > > > Key: FLINK-3836 > URL: https://issues.apache.org/jira/browse/FLINK-3836 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.2 >Reporter: Maximilian Bode >Assignee: Maximilian Bode >Priority: Minor > > Change > flink/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java > to enable Long counts instead of Integer. In particular, change the TreeMap > to be. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3836) Change Histogram to enable Long counters
[ https://issues.apache.org/jira/browse/FLINK-3836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280349#comment-15280349 ] ASF GitHub Bot commented on FLINK-3836: --- GitHub user mbode reopened a pull request: https://github.com/apache/flink/pull/1966 [FLINK-3836] Add LongHistogram accumulator New accumulator `LongHistogram`; the `Histogram` accumulator now throws an `IllegalArgumentException` instead of letting the int overflow. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/mbode/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1966.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1966 commit f457319481701a1234c9ea7d29da24f857ae4241 Author: Maximilian BodeDate: 2016-04-27T15:19:16Z [Flink-3836] Add LongHistogram accumulator > Change Histogram to enable Long counters > > > Key: FLINK-3836 > URL: https://issues.apache.org/jira/browse/FLINK-3836 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.2 >Reporter: Maximilian Bode >Assignee: Maximilian Bode >Priority: Minor > > Change > flink/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java > to enable Long counts instead of Integer. In particular, change the TreeMap > to be . -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3836] Add LongHistogram accumulator
GitHub user mbode reopened a pull request: https://github.com/apache/flink/pull/1966 [FLINK-3836] Add LongHistogram accumulator New accumulator `LongHistogram`; the `Histogram` accumulator now throws an `IllegalArgumentException` instead of letting the int overflow. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/mbode/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1966.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1966 commit f457319481701a1234c9ea7d29da24f857ae4241 Author: Maximilian BodeDate: 2016-04-27T15:19:16Z [Flink-3836] Add LongHistogram accumulator --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3836] Add LongHistogram accumulator
Github user mbode closed the pull request at: https://github.com/apache/flink/pull/1966 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-2310) Add an Adamic-Adar Similarity example
[ https://issues.apache.org/jira/browse/FLINK-2310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan closed FLINK-2310. - Resolution: Later Work on this algorithm is continuing in FLINK-3898. > Add an Adamic-Adar Similarity example > - > > Key: FLINK-2310 > URL: https://issues.apache.org/jira/browse/FLINK-2310 > Project: Flink > Issue Type: Task > Components: Gelly >Reporter: Andra Lungu >Assignee: Shivani Ghatge >Priority: Minor > > Just as Jaccard, the Adamic-Adar algorithm measures the similarity between a > set of nodes. However, instead of counting the common neighbors and dividing > them by the total number of neighbors, the similarity is weighted according > to the vertex degrees. In particular, it's equal to log(1/numberOfEdges). > The Adamic-Adar algorithm can be broken into three steps: > 1). For each vertex, compute the log of its inverse degrees (with the formula > above) and set it as the vertex value. > 2). Each vertex will then send this new computed value along with a list of > neighbors to the targets of its out-edges > 3). Weigh the edges with the Adamic-Adar index: Sum over n from CN of > log(1/k_n)(CN is the set of all common neighbors of two vertices x, y. k_n is > the degree of node n). See [2] > Prerequisites: > - Full understanding of the Jaccard Similarity Measure algorithm > - Reading the associated literature: > [1] http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf > [2] > http://stackoverflow.com/questions/22565620/fast-algorithm-to-compute-adamic-adar -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2044) Implementation of Gelly HITS Algorithm
[ https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280280#comment-15280280 ] ASF GitHub Bot commented on FLINK-2044: --- Github user gallenvara commented on a diff in the pull request: https://github.com/apache/flink/pull/1956#discussion_r62868637 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java --- @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; + +import org.apache.flink.api.common.aggregators.DoubleSumAggregator; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.ScatterGatherConfiguration; +import org.apache.flink.graph.spargel.VertexUpdateFunction; +import org.apache.flink.graph.utils.NullValueEdgeMapper; +import org.apache.flink.types.DoubleValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Preconditions; + +/** + * This is an implementation of HITS algorithm, using a scatter-gather iteration. + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters, + * hubs and authorities. A good hub represents a page that points to many other pages, and a good authority + * represented a page that is linked by many different hubs. + * Each vertex has a value of Tuple2 type, the first field is hub score and the second field is authority score. + * The implementation assumes that the two score are the same in each vertex at the beginning. + * + * + * @see https://en.wikipedia.org/wiki/HITS_algorithm;>HITS Algorithm + */ +public class HITSAlgorithmimplements GraphAlgorithm 0, "The number of maximum iteration should be greater than 0."); + this.maxIterations = maxIterations * 2 + 1; + } + + @Override + public DataSet >> run(Graph netGraph) throws Exception { + + ScatterGatherConfiguration parameter = new ScatterGatherConfiguration(); + parameter.setDirection(EdgeDirection.ALL); + parameter.registerAggregator("sumVertexValue", new DoubleSumAggregator()); + + return netGraph + .mapVertices(new VertexInitMapper ()) + .mapEdges(new NullValueEdgeMapper ()) --- End diff -- Yes, and i will modify the code. > Implementation of Gelly HITS Algorithm > -- > > Key: FLINK-2044 > URL: https://issues.apache.org/jira/browse/FLINK-2044 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Ahamd Javid >Assignee: GaoLun >Priority: Minor > > Implementation of Hits Algorithm in Gelly API using Java. the feature branch > can be found here: (https://github.com/JavidMayar/flink/commits/HITS) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...
Github user gallenvara commented on a diff in the pull request: https://github.com/apache/flink/pull/1956#discussion_r62868637 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java --- @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; + +import org.apache.flink.api.common.aggregators.DoubleSumAggregator; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.ScatterGatherConfiguration; +import org.apache.flink.graph.spargel.VertexUpdateFunction; +import org.apache.flink.graph.utils.NullValueEdgeMapper; +import org.apache.flink.types.DoubleValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Preconditions; + +/** + * This is an implementation of HITS algorithm, using a scatter-gather iteration. + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters, + * hubs and authorities. A good hub represents a page that points to many other pages, and a good authority + * represented a page that is linked by many different hubs. + * Each vertex has a value of Tuple2 type, the first field is hub score and the second field is authority score. + * The implementation assumes that the two score are the same in each vertex at the beginning. + * + * + * @see https://en.wikipedia.org/wiki/HITS_algorithm;>HITS Algorithm + */ +public class HITSAlgorithmimplements GraphAlgorithm 0, "The number of maximum iteration should be greater than 0."); + this.maxIterations = maxIterations * 2 + 1; + } + + @Override + public DataSet >> run(Graph netGraph) throws Exception { + + ScatterGatherConfiguration parameter = new ScatterGatherConfiguration(); + parameter.setDirection(EdgeDirection.ALL); + parameter.registerAggregator("sumVertexValue", new DoubleSumAggregator()); + + return netGraph + .mapVertices(new VertexInitMapper ()) + .mapEdges(new NullValueEdgeMapper ()) --- End diff -- Yes, and i will modify the code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-2375) Add Approximate Adamic Adar Similarity method using BloomFilters
[ https://issues.apache.org/jira/browse/FLINK-2375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri closed FLINK-2375. Resolution: Won't Fix This technique can be considered as part of FLINK-3898. > Add Approximate Adamic Adar Similarity method using BloomFilters > > > Key: FLINK-2375 > URL: https://issues.apache.org/jira/browse/FLINK-2375 > Project: Flink > Issue Type: Task > Components: Gelly >Reporter: Shivani Ghatge >Assignee: Shivani Ghatge >Priority: Minor > > Just as Jaccard, the Adamic-Adar algorithm measures the similarity between a > set of nodes. However, instead of counting the common neighbors and dividing > them by the total number of neighbors, the similarity is weighted according > to the vertex degrees. In particular, it's equal to log(1/numberOfEdges). > The Adamic-Adar algorithm can be broken into three steps: > 1). For each vertex, compute the log of its inverse degrees (with the formula > above) and set it as the vertex value. > 2). Each vertex will then send this new computed value along with a list of > neighbors to the targets of its out-edges > 3). Weigh the edges with the Adamic-Adar index: Sum over n from CN of > log(1/k_n)(CN is the set of all common neighbors of two vertices x, y. k_n is > the degree of node n). See [2] > Using BloomFilters we increase the scalability of the algorithm. The values > calculated for the edges will be approximate. > Prerequisites: > Full understanding of the Jaccard Similarity Measure algorithm > Reading the associated literature: > [1] http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf > [2] > http://stackoverflow.com/questions/22565620/fast-algorithm-to-compute-adamic-adar -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-2634) Add a Vertex-centric Version of the Tringle Count Library Method
[ https://issues.apache.org/jira/browse/FLINK-2634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri closed FLINK-2634. Resolution: Won't Fix There is currently no need or demand for this version. The current TriangleEnumerator cover this functionality. > Add a Vertex-centric Version of the Tringle Count Library Method > > > Key: FLINK-2634 > URL: https://issues.apache.org/jira/browse/FLINK-2634 > Project: Flink > Issue Type: Task > Components: Gelly >Affects Versions: 0.10.0 >Reporter: Andra Lungu >Assignee: Andra Lungu >Priority: Minor > > The vertex-centric version of this algorithm receives an undirected graph as > input and outputs the total number of triangles formed by the graph's edges. > The implementation consists of three phases: > 1). Select neighbours with id greater than the current vertex id. > 2). Propagate each received value to neighbours with higher id. > 3). Compute the number of Triangles by verifying if the final vertex contains > the sender's id in its list. > As opposed to the GAS version, all these three steps will be performed via > message passing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3898) Adamic-Adar Similarity
[ https://issues.apache.org/jira/browse/FLINK-3898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280260#comment-15280260 ] Vasia Kalavri commented on FLINK-3898: -- I guess we can close FLINK-2310 as a duplicate? I know that nobody is working on it. > Adamic-Adar Similarity > -- > > Key: FLINK-3898 > URL: https://issues.apache.org/jira/browse/FLINK-3898 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > > The implementation of Adamic-Adar Similarity [0] is very close to Jaccard > Similarity. Whereas Jaccard Similarity counts common neighbors, Adamic-Adar > Similarity sums the inverse logarithm of the degree of common neighbors. > Consideration will be given to the computation of the inverse logarithm, in > particular whether to pre-compute a small array of values. > [0] http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1956#discussion_r62863450 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java --- @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; + +import org.apache.flink.api.common.aggregators.DoubleSumAggregator; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.ScatterGatherConfiguration; +import org.apache.flink.graph.spargel.VertexUpdateFunction; +import org.apache.flink.graph.utils.NullValueEdgeMapper; +import org.apache.flink.types.DoubleValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Preconditions; + +/** + * This is an implementation of HITS algorithm, using a scatter-gather iteration. + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters, + * hubs and authorities. A good hub represents a page that points to many other pages, and a good authority + * represented a page that is linked by many different hubs. + * Each vertex has a value of Tuple2 type, the first field is hub score and the second field is authority score. + * The implementation assumes that the two score are the same in each vertex at the beginning. + * + * + * @see https://en.wikipedia.org/wiki/HITS_algorithm;>HITS Algorithm + */ +public class HITSAlgorithmimplements GraphAlgorithm 0, "The number of maximum iteration should be greater than 0."); + this.maxIterations = maxIterations * 2 + 1; + } + + @Override + public DataSet >> run(Graph netGraph) throws Exception { + + ScatterGatherConfiguration parameter = new ScatterGatherConfiguration(); + parameter.setDirection(EdgeDirection.ALL); + parameter.registerAggregator("sumVertexValue", new DoubleSumAggregator()); + + return netGraph + .mapVertices(new VertexInitMapper ()) + .mapEdges(new NullValueEdgeMapper ()) --- End diff -- Mapping to `NullValue` is not necessary if the edge values are already `NullValue`. Based on the code in `Translate`, we can `TypeInformation typeInfo = ((TupleTypeInfo >) netGraph.getEdges().getType()).getTypeAt(2);` and then `if (typeInfo.getTypeClass().equals(NullValue.class)) ...` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3879) Native implementation of HITS algorithm
[ https://issues.apache.org/jira/browse/FLINK-3879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280246#comment-15280246 ] Vasia Kalavri commented on FLINK-3879: -- [~greghogan] - Do we agree that the PR for FLINK-2044 is now in good state and could be merged? Or would you rather benchmark this against it and go for the most performant one? - Gelly library methods: currently there are scatter-gather and GSA implementations for PageRank, Connected Components, and SSSP. We have these because GSA performs better for graphs with skewed degree distributions. In the Gelly docs-[iteration abstractions comparison|https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/gelly.html#iteration-abstractions-comparison], we describe when GSA should be preferred over scatter-gather. Maybe we can make this more explicit. There is no Pregel implementation (only in examples). The {{GSATriangleCount}} library method has proved to be very inefficient and should be removed imo (I'll open a JIRA). - I'm not sure what you mean by "approximate HITS"? > Native implementation of HITS algorithm > --- > > Key: FLINK-3879 > URL: https://issues.apache.org/jira/browse/FLINK-3879 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Hyperlink-Induced Topic Search (HITS, also "hubs and authorities") is > presented in [0] and described in [1]. > "[HITS] is a very popular and effective algorithm to rank documents based on > the link information among a set of documents. The algorithm presumes that a > good hub is a document that points to many others, and a good authority is a > document that many documents point to." > [https://pdfs.semanticscholar.org/a8d7/c7a4c53a9102c4239356f9072ec62ca5e62f.pdf] > This implementation differs from FLINK-2044 by providing for convergence, > outputting both hub and authority scores, and completing in half the number > of iterations. > [0] http://www.cs.cornell.edu/home/kleinber/auth.pdf > [1] https://en.wikipedia.org/wiki/HITS_algorithm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1956#discussion_r62863557 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java --- @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; + +import org.apache.flink.api.common.aggregators.DoubleSumAggregator; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.ScatterGatherConfiguration; +import org.apache.flink.graph.spargel.VertexUpdateFunction; +import org.apache.flink.graph.utils.NullValueEdgeMapper; +import org.apache.flink.types.DoubleValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Preconditions; + +/** + * This is an implementation of HITS algorithm, using a scatter-gather iteration. + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters, + * hubs and authorities. A good hub represents a page that points to many other pages, and a good authority + * represented a page that is linked by many different hubs. + * Each vertex has a value of Tuple2 type, the first field is hub score and the second field is authority score. + * The implementation assumes that the two score are the same in each vertex at the beginning. + * + * + * @see https://en.wikipedia.org/wiki/HITS_algorithm;>HITS Algorithm + */ +public class HITSAlgorithmimplements GraphAlgorithm 0, "The number of maximum iteration should be greater than 0."); + this.maxIterations = maxIterations * 2 + 1; + } + + @Override + public DataSet >> run(Graph netGraph) throws Exception { --- End diff -- Change `Double` to `DoubleValue`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3701] reuse serializer lists in Executi...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/1913#issuecomment-218483592 Thanks for for your remarks. Instead of serializing/deserializing parts of the `ExecutionConfig` whenever the usercode class loader needs to be passed, I would like to serialize/deserialize the entire `ExecutionConfig`. This makes everything explicit wherever the config is used. Plus, we create an explicit copy of the config upon submission, regardless of the Akka object reuse mode. The PojoSerializer will just keep a regular copy of the ExecutionConfig because we always have the user code class loader available then. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1981#issuecomment-218481504 Thanks for this PR, @dawidwys! I haven't had a look yet but will do soon. Just a quick comment: this PR touches some files that PR #1958 is also modifying (mostly on the API level, not the runtime and optimization code). PR #1958 is a bigger change but in good shape and should be mergable soon. I would like to merge #1958 before this one, so you will need to rebase once this has happened. That should not be too much work, just wanted to let you know in advance. Thanks, Fabian --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280204#comment-15280204 ] ASF GitHub Bot commented on FLINK-3754: --- Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1958#issuecomment-218475561 @yjshen great work! PR looks very good. I had only some minor refactoring comments. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...
Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1958#issuecomment-218475561 @yjshen great work! PR looks very good. I had only some minor refactoring comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280218#comment-15280218 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on the pull request: https://github.com/apache/flink/pull/1958#issuecomment-218477620 Hi @twalthr, thanks very much for the review work! I'll resolve your comments shortly :) > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...
Github user yjshen commented on the pull request: https://github.com/apache/flink/pull/1958#issuecomment-218477620 Hi @twalthr, thanks very much for the review work! I'll resolve your comments shortly :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3776] Flink Scala shell does not allow ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1945#discussion_r62857420 --- Diff: flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -141,7 +141,7 @@ object FlinkShell { ): (String, Int, Option[Either[FlinkMiniCluster, AbstractFlinkYarnCluster]]) = { config.executionMode match { case ExecutionMode.LOCAL => // Local mode -val config = new Configuration() +val config = GlobalConfiguration.getConfiguration() --- End diff -- I think the conflagration needs to be loaded similarly as in line 183. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3776) Flink Scala shell does not allow to set configuration for local execution
[ https://issues.apache.org/jira/browse/FLINK-3776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280203#comment-15280203 ] ASF GitHub Bot commented on FLINK-3776: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1945#discussion_r62857420 --- Diff: flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -141,7 +141,7 @@ object FlinkShell { ): (String, Int, Option[Either[FlinkMiniCluster, AbstractFlinkYarnCluster]]) = { config.executionMode match { case ExecutionMode.LOCAL => // Local mode -val config = new Configuration() +val config = GlobalConfiguration.getConfiguration() --- End diff -- I think the conflagration needs to be loaded similarly as in line 183. > Flink Scala shell does not allow to set configuration for local execution > - > > Key: FLINK-3776 > URL: https://issues.apache.org/jira/browse/FLINK-3776 > Project: Flink > Issue Type: Improvement > Components: Scala Shell >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Priority: Minor > > Flink's Scala shell starts a {{LocalFlinkMiniCluster}} with an empty > configuration when the shell is started in local mode. In order to allow the > user to configure the mini cluster, e.g., number of slots, size of memory, it > would be good to forward a user specified configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3776) Flink Scala shell does not allow to set configuration for local execution
[ https://issues.apache.org/jira/browse/FLINK-3776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280209#comment-15280209 ] ASF GitHub Bot commented on FLINK-3776: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/1945#issuecomment-218476654 Thanks for the pull request. Looks good. Have you tested that the configuration is loaded correctly? > Flink Scala shell does not allow to set configuration for local execution > - > > Key: FLINK-3776 > URL: https://issues.apache.org/jira/browse/FLINK-3776 > Project: Flink > Issue Type: Improvement > Components: Scala Shell >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Priority: Minor > > Flink's Scala shell starts a {{LocalFlinkMiniCluster}} with an empty > configuration when the shell is started in local mode. In order to allow the > user to configure the mini cluster, e.g., number of slots, size of memory, it > would be good to forward a user specified configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3776] Flink Scala shell does not allow ...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/1945#issuecomment-218476654 Thanks for the pull request. Looks good. Have you tested that the configuration is loaded correctly? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3733) registeredTypesWithKryoSerializers is not assigned in ExecutionConfig#deserializeUserCode()
[ https://issues.apache.org/jira/browse/FLINK-3733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280193#comment-15280193 ] Maximilian Michels commented on FLINK-3733: --- I think this might be a duplicate of FLINK-3701. > registeredTypesWithKryoSerializers is not assigned in > ExecutionConfig#deserializeUserCode() > --- > > Key: FLINK-3733 > URL: https://issues.apache.org/jira/browse/FLINK-3733 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > if (serializedRegisteredTypesWithKryoSerializers != null) { > registeredTypesWithKryoSerializers = > serializedRegisteredTypesWithKryoSerializers.deserializeValue(userCodeClassLoader); > } else { > registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>(); > } > {code} > When serializedRegisteredTypesWithKryoSerializers is null, > registeredTypesWithKryoSerializers is not assigned. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280187#comment-15280187 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62856490 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/UnresolvedException.scala --- @@ -15,9 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.api.table +package org.apache.flink.api.table.expressions -/** - * General Exception for all errors during table handling. - */ -class TableException(msg: String) extends RuntimeException(msg) +case class UnresolvedException(msg: String) extends RuntimeException(msg) --- End diff -- OK, will do this > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...
Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62856490 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/UnresolvedException.scala --- @@ -15,9 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.api.table +package org.apache.flink.api.table.expressions -/** - * General Exception for all errors during table handling. - */ -class TableException(msg: String) extends RuntimeException(msg) +case class UnresolvedException(msg: String) extends RuntimeException(msg) --- End diff -- OK, will do this --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280178#comment-15280178 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62855247 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala --- @@ -18,85 +18,26 @@ package org.apache.flink.api.table.expressions import org.apache.calcite.rex.RexNode -import org.apache.calcite.sql.SqlOperator -import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.table.validate.{ExprValidationResult, ValidationFailure} /** * General expression for unresolved function calls. The function can be a built-in * scalar function or a user-defined scalar function. */ -case class Call(functionName: String, args: Expression*) extends Expression { +case class Call(functionName: String, args: Seq[Expression]) extends Expression { --- End diff -- While I was resolving `Call` into solid expressions, `FunctionCatalog` is used and its `withChildren` method is looking up constructors to create new `expression` as follows: 1. look up expressions take `Seq[Expression]` as argument 2. if 1 is not fulfilled, look up the constructor match exactly the number of arguments, like the `MyFunc` example you provided. Does this make sense? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...
Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62855247 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala --- @@ -18,85 +18,26 @@ package org.apache.flink.api.table.expressions import org.apache.calcite.rex.RexNode -import org.apache.calcite.sql.SqlOperator -import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.table.validate.{ExprValidationResult, ValidationFailure} /** * General expression for unresolved function calls. The function can be a built-in * scalar function or a user-defined scalar function. */ -case class Call(functionName: String, args: Expression*) extends Expression { +case class Call(functionName: String, args: Seq[Expression]) extends Expression { --- End diff -- While I was resolving `Call` into solid expressions, `FunctionCatalog` is used and its `withChildren` method is looking up constructors to create new `expression` as follows: 1. look up expressions take `Seq[Expression]` as argument 2. if 1 is not fulfilled, look up the constructor match exactly the number of arguments, like the `MyFunc` example you provided. Does this make sense? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...
Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62853599 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala --- @@ -130,19 +130,17 @@ trait ImplicitExpressionOperations { * @param endIndex last character of the substring (starting at 1, inclusive) * @return substring */ - def substring(beginIndex: Expression, endIndex: Expression) = { -Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex, endIndex) - } + def substring(beginIndex: Expression, endIndex: Expression) = +SubString(expr, beginIndex, endIndex) /** * Creates a substring of the given string beginning at the given index to the end. * * @param beginIndex first character of the substring (starting at 1, inclusive) * @return substring */ - def substring(beginIndex: Expression) = { -Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex) - } + def substring(beginIndex: Expression) = +new SubString(expr, beginIndex) --- End diff -- Since `SubString` is defined as `case class SubString(str: Expression, begin: Expression, end: Expression)` and so does the generated `apply` method, the `new` here cannot be removed, otherwise complaining of `cannot resolve subString with such signature` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Flink 3750 fixed
Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/1941#discussion_r62854701 --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/example/JDBCFullTest.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc.example; + +import java.sql.Types; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.jdbc.JDBCInputFormat; +import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder; +import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat; +import org.apache.flink.api.java.io.jdbc.JDBCTestBase; +import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.junit.Test; + +public class JDBCFullTest extends JDBCTestBase { + + @Test + public void test() throws Exception { --- End diff -- Before of my PR this class was not even a test, the code was inside a main(), I thought it was at least better than that but I didn't want to spend more time on this..is it a problem if we leave it as it is? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62851933 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala --- @@ -18,85 +18,26 @@ package org.apache.flink.api.table.expressions import org.apache.calcite.rex.RexNode -import org.apache.calcite.sql.SqlOperator -import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.table.validate.{ExprValidationResult, ValidationFailure} /** * General expression for unresolved function calls. The function can be a built-in * scalar function or a user-defined scalar function. */ -case class Call(functionName: String, args: Expression*) extends Expression { +case class Call(functionName: String, args: Seq[Expression]) extends Expression { --- End diff -- The reason why I used var-args here was that it is easier to be used in the API once custom functions are possible: `.filter(Call("MYFUNC", 'f1, 'f2))`. But we can also find an other way later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280165#comment-15280165 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62853599 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala --- @@ -130,19 +130,17 @@ trait ImplicitExpressionOperations { * @param endIndex last character of the substring (starting at 1, inclusive) * @return substring */ - def substring(beginIndex: Expression, endIndex: Expression) = { -Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex, endIndex) - } + def substring(beginIndex: Expression, endIndex: Expression) = +SubString(expr, beginIndex, endIndex) /** * Creates a substring of the given string beginning at the given index to the end. * * @param beginIndex first character of the substring (starting at 1, inclusive) * @return substring */ - def substring(beginIndex: Expression) = { -Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex) - } + def substring(beginIndex: Expression) = +new SubString(expr, beginIndex) --- End diff -- Since `SubString` is defined as `case class SubString(str: Expression, begin: Expression, end: Expression)` and so does the generated `apply` method, the `new` here cannot be removed, otherwise complaining of `cannot resolve subString with such signature` > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280168#comment-15280168 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62853748 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/TypeCoercion.scala --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation + +object TypeCoercion { --- End diff -- You are right, will fix this. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...
Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62853748 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/TypeCoercion.scala --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation + +object TypeCoercion { --- End diff -- You are right, will fix this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-3780) Jaccard Similarity
[ https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-3780: -- Fix Version/s: 1.1.0 > Jaccard Similarity > -- > > Key: FLINK-3780 > URL: https://issues.apache.org/jira/browse/FLINK-3780 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Implement a Jaccard Similarity algorithm computing all non-zero similarity > scores. This algorithm is similar to {{TriangleListing}} but instead of > joining two-paths against an edge list we count two-paths. > {{flink-gelly-examples}} currently has {{JaccardSimilarityMeasure}} which > relies on {{Graph.getTriplets()}} so only computes similarity scores for > neighbors but not neighbors-of-neighbors. > This algorithm is easily modified for other similarity scores such as > Adamic-Adar similarity where the sum of endpoint degrees is replaced by the > degree of the middle vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3898) Adamic-Adar Similarity
Greg Hogan created FLINK-3898: - Summary: Adamic-Adar Similarity Key: FLINK-3898 URL: https://issues.apache.org/jira/browse/FLINK-3898 Project: Flink Issue Type: New Feature Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan The implementation of Adamic-Adar Similarity [0] is very close to Jaccard Similarity. Whereas Jaccard Similarity counts common neighbors, Adamic-Adar Similarity sums the inverse logarithm of the degree of common neighbors. Consideration will be given to the computation of the inverse logarithm, in particular whether to pre-compute a small array of values. [0] http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Flink 3750 fixed
Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/1941#discussion_r62852662 --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java --- @@ -157,14 +216,25 @@ public boolean reachedEnd() throws IOException { * @throws java.io.IOException */ @Override - public OUT nextRecord(OUT tuple) throws IOException { + public Row nextRecord(Row row) throws IOException { try { - resultSet.next(); - if (columnTypes == null) { - extractTypes(tuple); + hasNext = resultSet.next(); + if (!hasNext) { + return null; + } + try { + //This throws a NPE when the TypeInfo is not passed to the InputFormat, + //i.e. KryoSerializer used to generate the passed row + row.productArity(); --- End diff -- because it can happen that the row !=null but inside it is uninitialized, for example when the (see for example **JDBCInputFormatTest.testUninitializedRow**. That happens when you do the following (for example): ``` java DataSet source = environment.createInput(inputBuilder.finish()); ``` The only method I found to detect such a situation is to catch the nullPointerException generated when trying to access to one of its methods, e.g. ```java row.productArity() ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280151#comment-15280151 ] ASF GitHub Bot commented on FLINK-3754: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62852367 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala --- @@ -20,19 +20,55 @@ package org.apache.flink.api.table.expressions import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder -import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, TypeInformation} import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.table.validate._ -case class Cast(child: Expression, tpe: TypeInformation[_]) extends UnaryExpression { +case class Cast(child: Expression, resultType: TypeInformation[_]) extends UnaryExpression { - override def toString = s"$child.cast($tpe)" + override def toString = s"$child.cast($resultType)" override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { -relBuilder.cast(child.toRexNode, TypeConverter.typeInfoToSqlType(tpe)) +relBuilder.cast(child.toRexNode, TypeConverter.typeInfoToSqlType(resultType)) } - override def makeCopy(anyRefs: Seq[AnyRef]): this.type = { + override def makeCopy(anyRefs: Array[AnyRef]): this.type = { val child: Expression = anyRefs.head.asInstanceOf[Expression] -copy(child, tpe).asInstanceOf[this.type] +copy(child, resultType).asInstanceOf[this.type] + } + + override def validateInput(): ExprValidationResult = { +if (Cast.canCast(child.resultType, resultType)) { + ValidationSuccess +} else { + ValidationFailure(s"Unsupported cast from ${child.resultType} to $resultType") +} + } +} + +object Cast { --- End diff -- I would also move this into `typeutils`. We'll may need it somewhere else too. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2044) Implementation of Gelly HITS Algorithm
[ https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280150#comment-15280150 ] ASF GitHub Bot commented on FLINK-2044: --- Github user gallenvara commented on the pull request: https://github.com/apache/flink/pull/1956#issuecomment-218467237 @vasia Thanks a lot and PR has been updated as you advised. > Implementation of Gelly HITS Algorithm > -- > > Key: FLINK-2044 > URL: https://issues.apache.org/jira/browse/FLINK-2044 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Ahamd Javid >Assignee: GaoLun >Priority: Minor > > Implementation of Hits Algorithm in Gelly API using Java. the feature branch > can be found here: (https://github.com/JavidMayar/flink/commits/HITS) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62852367 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala --- @@ -20,19 +20,55 @@ package org.apache.flink.api.table.expressions import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder -import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, TypeInformation} import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.table.validate._ -case class Cast(child: Expression, tpe: TypeInformation[_]) extends UnaryExpression { +case class Cast(child: Expression, resultType: TypeInformation[_]) extends UnaryExpression { - override def toString = s"$child.cast($tpe)" + override def toString = s"$child.cast($resultType)" override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { -relBuilder.cast(child.toRexNode, TypeConverter.typeInfoToSqlType(tpe)) +relBuilder.cast(child.toRexNode, TypeConverter.typeInfoToSqlType(resultType)) } - override def makeCopy(anyRefs: Seq[AnyRef]): this.type = { + override def makeCopy(anyRefs: Array[AnyRef]): this.type = { val child: Expression = anyRefs.head.asInstanceOf[Expression] -copy(child, tpe).asInstanceOf[this.type] +copy(child, resultType).asInstanceOf[this.type] + } + + override def validateInput(): ExprValidationResult = { +if (Cast.canCast(child.resultType, resultType)) { + ValidationSuccess +} else { + ValidationFailure(s"Unsupported cast from ${child.resultType} to $resultType") +} + } +} + +object Cast { --- End diff -- I would also move this into `typeutils`. We'll may need it somewhere else too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...
Github user gallenvara commented on the pull request: https://github.com/apache/flink/pull/1956#issuecomment-218467237 @vasia Thanks a lot and PR has been updated as you advised. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280145#comment-15280145 ] ASF GitHub Bot commented on FLINK-3754: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62851933 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala --- @@ -18,85 +18,26 @@ package org.apache.flink.api.table.expressions import org.apache.calcite.rex.RexNode -import org.apache.calcite.sql.SqlOperator -import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.table.validate.{ExprValidationResult, ValidationFailure} /** * General expression for unresolved function calls. The function can be a built-in * scalar function or a user-defined scalar function. */ -case class Call(functionName: String, args: Expression*) extends Expression { +case class Call(functionName: String, args: Seq[Expression]) extends Expression { --- End diff -- The reason why I used var-args here was that it is easier to be used in the API once custom functions are possible: `.filter(Call("MYFUNC", 'f1, 'f2))`. But we can also find an other way later. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280140#comment-15280140 ] ASF GitHub Bot commented on FLINK-3754: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62851129 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala --- @@ -18,85 +18,26 @@ package org.apache.flink.api.table.expressions import org.apache.calcite.rex.RexNode -import org.apache.calcite.sql.SqlOperator -import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.table.validate.{ExprValidationResult, ValidationFailure} /** * General expression for unresolved function calls. The function can be a built-in * scalar function or a user-defined scalar function. */ -case class Call(functionName: String, args: Expression*) extends Expression { +case class Call(functionName: String, args: Seq[Expression]) extends Expression { override def children: Seq[Expression] = args override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { -relBuilder.call( - BuiltInFunctionNames.toSqlOperator(functionName), - args.map(_.toRexNode): _*) +throw new UnresolvedException(s"trying to convert UnresolvedFunction $functionName to RexNode") } override def toString = s"\\$functionName(${args.mkString(", ")})" - override def makeCopy(newArgs: Seq[AnyRef]): this.type = { -val copy = Call( - newArgs.head.asInstanceOf[String], - newArgs.drop(1).asInstanceOf[Seq[Expression]]: _*) + override def resultType = +throw new UnresolvedException(s"calling dataType on Unresolved Function $functionName") --- End diff -- resultType instead of dataType. Space between Unresolved Function. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62851129 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala --- @@ -18,85 +18,26 @@ package org.apache.flink.api.table.expressions import org.apache.calcite.rex.RexNode -import org.apache.calcite.sql.SqlOperator -import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.table.validate.{ExprValidationResult, ValidationFailure} /** * General expression for unresolved function calls. The function can be a built-in * scalar function or a user-defined scalar function. */ -case class Call(functionName: String, args: Expression*) extends Expression { +case class Call(functionName: String, args: Seq[Expression]) extends Expression { override def children: Seq[Expression] = args override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { -relBuilder.call( - BuiltInFunctionNames.toSqlOperator(functionName), - args.map(_.toRexNode): _*) +throw new UnresolvedException(s"trying to convert UnresolvedFunction $functionName to RexNode") } override def toString = s"\\$functionName(${args.mkString(", ")})" - override def makeCopy(newArgs: Seq[AnyRef]): this.type = { -val copy = Call( - newArgs.head.asInstanceOf[String], - newArgs.drop(1).asInstanceOf[Seq[Expression]]: _*) + override def resultType = +throw new UnresolvedException(s"calling dataType on Unresolved Function $functionName") --- End diff -- resultType instead of dataType. Space between Unresolved Function. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3879) Native implementation of HITS algorithm
[ https://issues.apache.org/jira/browse/FLINK-3879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280134#comment-15280134 ] Greg Hogan commented on FLINK-3879: --- Implementations which merely showcase the use of Gelly graph models seem most appropriate as examples. Do we have examples of inputs which perform better as GSA vs SG vs Pregel? I am not finding any direct guidance in the documentation for a user looking to choose between duplicate library algorithms. FLINK-3879 will be faster unless one of the current models is extended or a new graph model is created to process out- and in- edges separately in the same iteration or to allow disabling operators on certain supersteps. An approximate HITS using delta iterations would be as easy to implement natively as with GSA. Before accepting such an implementation I would like to see evidence that performing more approximate iterations converges more quickly when compared with running fewer bulk iterations. > Native implementation of HITS algorithm > --- > > Key: FLINK-3879 > URL: https://issues.apache.org/jira/browse/FLINK-3879 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Hyperlink-Induced Topic Search (HITS, also "hubs and authorities") is > presented in [0] and described in [1]. > "[HITS] is a very popular and effective algorithm to rank documents based on > the link information among a set of documents. The algorithm presumes that a > good hub is a document that points to many others, and a good authority is a > document that many documents point to." > [https://pdfs.semanticscholar.org/a8d7/c7a4c53a9102c4239356f9072ec62ca5e62f.pdf] > This implementation differs from FLINK-2044 by providing for convergence, > outputting both hub and authority scores, and completing in half the number > of iterations. > [0] http://www.cs.cornell.edu/home/kleinber/auth.pdf > [1] https://en.wikipedia.org/wiki/HITS_algorithm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Flink 3750 fixed
Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/1941#discussion_r62850239 --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java --- @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io.jdbc; + +import java.io.IOException; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; + +/** + * Base test class for JDBC Input and Output formats + */ +public class JDBCTestBase { + + public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver"; + public static final String DB_URL = "jdbc:derby:memory:ebookshop"; + public static final String INPUT_TABLE = "books"; + public static final String OUTPUT_TABLE = "newbooks"; + public static final String SELECT_ALL_BOOKS = "select * from " + INPUT_TABLE; + public static final String SELECT_ALL_NEWBOOKS = "select * from " + OUTPUT_TABLE; + public static final String SELECT_EMPTY = "select * from books WHERE QTY < 0"; + public static final String INSERT_TEMPLATE = "insert into %s (id, title, author, price, qty) values (?,?,?,?,?)"; + public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?"; + public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE author = ?"; + + protected JDBCInputFormat jdbcInputFormat; + protected JDBCOutputFormat jdbcOutputFormat; + + protected static Connection conn; + + public static final Object[][] testData = { --- End diff -- The record with id 1010 contains already a null field..isn't that sufficient? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280129#comment-15280129 ] ASF GitHub Bot commented on FLINK-3754: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62849325 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/UnresolvedException.scala --- @@ -15,9 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.api.table +package org.apache.flink.api.table.expressions -/** - * General Exception for all errors during table handling. - */ -class TableException(msg: String) extends RuntimeException(msg) +case class UnresolvedException(msg: String) extends RuntimeException(msg) --- End diff -- Could you put this into `exceptions.scala` and document it? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62849325 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/UnresolvedException.scala --- @@ -15,9 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.api.table +package org.apache.flink.api.table.expressions -/** - * General Exception for all errors during table handling. - */ -class TableException(msg: String) extends RuntimeException(msg) +case class UnresolvedException(msg: String) extends RuntimeException(msg) --- End diff -- Could you put this into `exceptions.scala` and document it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62848946 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/TypeCoercion.scala --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation + +object TypeCoercion { --- End diff -- I would put this into the `typeutils` package. The `expressions` package should only contain expressions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280120#comment-15280120 ] ASF GitHub Bot commented on FLINK-3754: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62848946 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/TypeCoercion.scala --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation + +object TypeCoercion { --- End diff -- I would put this into the `typeutils` package. The `expressions` package should only contain expressions. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Flink 3750 fixed
Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/1941#discussion_r62847383 --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java --- @@ -81,25 +134,51 @@ public void configure(Configuration parameters) { * @throws IOException */ @Override - public void open(InputSplit ignored) throws IOException { + public void open(InputSplit inputSplit) throws IOException { + hasNext = true; try { - establishConnection(); - statement = dbConn.createStatement(resultSetType, resultSetConcurrency); - resultSet = statement.executeQuery(query); + if (inputSplit != null && parameterValues != null) { + for (int i = 0; i < parameterValues[inputSplit.getSplitNumber()].length; i++) { + Object param = parameterValues[inputSplit.getSplitNumber()][i]; + if (param instanceof String) { + statement.setString(i + 1, (String) param); + } else if (param instanceof Long) { + statement.setLong(i + 1, (Long) param); + } else if (param instanceof Integer) { + statement.setInt(i + 1, (Integer) param); + } else if (param instanceof Double) { + statement.setDouble(i + 1, (Double) param); + } else if (param instanceof Boolean) { + statement.setBoolean(i + 1, (Boolean) param); + } else if (param instanceof Float) { + statement.setFloat(i + 1, (Float) param); + } else if (param instanceof BigDecimal) { + statement.setBigDecimal(i + 1, (BigDecimal) param); + } else if (param instanceof Byte) { + statement.setByte(i + 1, (Byte) param); + } else if (param instanceof Short) { + statement.setShort(i + 1, (Short) param); + } else if (param instanceof Date) { + statement.setDate(i + 1, (Date) param); + } else if (param instanceof Time) { + statement.setTime(i + 1, (Time) param); + } else if (param instanceof Timestamp) { + statement.setTimestamp(i + 1, (Timestamp) param); + } else if (param instanceof Array) { + statement.setArray(i + 1, (Array) param); + } else { + //extends with other types if needed + throw new IllegalArgumentException("open() failed. Parameter " + i + " of type " + param.getClass() + " is not handled (yet)." ); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Executing '%s' with parameters %s", queryTemplate, Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()]))); + } + } + resultSet = statement.executeQuery(); } catch (SQLException se) { close(); --- End diff -- I did that in the previous version of this PR (https://github.com/apache/flink/pull/1885) but @zentol told me to leave it ("this is not guaranteed, so please add them back"). From my check @zentol was right..isn't it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280074#comment-15280074 ] ASF GitHub Bot commented on FLINK-3754: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62843680 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala --- @@ -130,19 +130,17 @@ trait ImplicitExpressionOperations { * @param endIndex last character of the substring (starting at 1, inclusive) * @return substring */ - def substring(beginIndex: Expression, endIndex: Expression) = { -Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex, endIndex) - } + def substring(beginIndex: Expression, endIndex: Expression) = +SubString(expr, beginIndex, endIndex) /** * Creates a substring of the given string beginning at the given index to the end. * * @param beginIndex first character of the substring (starting at 1, inclusive) * @return substring */ - def substring(beginIndex: Expression) = { -Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex) - } + def substring(beginIndex: Expression) = +new SubString(expr, beginIndex) --- End diff -- `new` can be removed. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62843680 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala --- @@ -130,19 +130,17 @@ trait ImplicitExpressionOperations { * @param endIndex last character of the substring (starting at 1, inclusive) * @return substring */ - def substring(beginIndex: Expression, endIndex: Expression) = { -Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex, endIndex) - } + def substring(beginIndex: Expression, endIndex: Expression) = +SubString(expr, beginIndex, endIndex) /** * Creates a substring of the given string beginning at the given index to the end. * * @param beginIndex first character of the substring (starting at 1, inclusive) * @return substring */ - def substring(beginIndex: Expression) = { -Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex) - } + def substring(beginIndex: Expression) = +new SubString(expr, beginIndex) --- End diff -- `new` can be removed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280049#comment-15280049 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on the pull request: https://github.com/apache/flink/pull/1958#issuecomment-218447948 Hi @fhueske , I've just finished my work, can you give a another pass of review? Thanks! > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...
Github user yjshen commented on the pull request: https://github.com/apache/flink/pull/1958#issuecomment-218447948 Hi @fhueske , I've just finished my work, can you give a another pass of review? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1956#issuecomment-218446355 Thanks for the updates @gallenvara. I left a few minor comments. Could you please also add the algorithm in the Gelly documentation under "library methods"? It should be good to merge after that :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2044) Implementation of Gelly HITS Algorithm
[ https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280047#comment-15280047 ] ASF GitHub Bot commented on FLINK-2044: --- Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1956#issuecomment-218446355 Thanks for the updates @gallenvara. I left a few minor comments. Could you please also add the algorithm in the Gelly documentation under "library methods"? It should be good to merge after that :) > Implementation of Gelly HITS Algorithm > -- > > Key: FLINK-2044 > URL: https://issues.apache.org/jira/browse/FLINK-2044 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Ahamd Javid >Assignee: GaoLun >Priority: Minor > > Implementation of Hits Algorithm in Gelly API using Java. the feature branch > can be found here: (https://github.com/JavidMayar/flink/commits/HITS) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2044) Implementation of Gelly HITS Algorithm
[ https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280042#comment-15280042 ] ASF GitHub Bot commented on FLINK-2044: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1956#discussion_r62838798 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; + +import org.apache.flink.api.common.aggregators.DoubleSumAggregator; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.ScatterGatherConfiguration; +import org.apache.flink.graph.spargel.VertexUpdateFunction; +import org.apache.flink.graph.utils.NullValueEdgeMapper; +import org.apache.flink.types.DoubleValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Preconditions; + +/** + * This is an implementation of HITS algorithm, using a scatter-gather iteration. + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters, + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority + * represented a page that was linked by many different hubs. The implementation assumes that the two value on + * every vertex are the same at the beginning. + * + * + * @see https://en.wikipedia.org/wiki/HITS_algorithm;>HITS Algorithm + */ --- End diff -- Can you also please add a comment about the result type? Which tuple field is the authority score and which is the hub? > Implementation of Gelly HITS Algorithm > -- > > Key: FLINK-2044 > URL: https://issues.apache.org/jira/browse/FLINK-2044 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Ahamd Javid >Assignee: GaoLun >Priority: Minor > > Implementation of Hits Algorithm in Gelly API using Java. the feature branch > can be found here: (https://github.com/JavidMayar/flink/commits/HITS) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2044) Implementation of Gelly HITS Algorithm
[ https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280040#comment-15280040 ] ASF GitHub Bot commented on FLINK-2044: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1956#discussion_r62838726 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; + +import org.apache.flink.api.common.aggregators.DoubleSumAggregator; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.ScatterGatherConfiguration; +import org.apache.flink.graph.spargel.VertexUpdateFunction; +import org.apache.flink.graph.utils.NullValueEdgeMapper; +import org.apache.flink.types.DoubleValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Preconditions; + +/** + * This is an implementation of HITS algorithm, using a scatter-gather iteration. + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters, + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority + * represented a page that was linked by many different hubs. The implementation assumes that the two value on --- End diff -- represented => represents pointed => points was linked => is linked the two value => the two values* > Implementation of Gelly HITS Algorithm > -- > > Key: FLINK-2044 > URL: https://issues.apache.org/jira/browse/FLINK-2044 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Ahamd Javid >Assignee: GaoLun >Priority: Minor > > Implementation of Hits Algorithm in Gelly API using Java. the feature branch > can be found here: (https://github.com/JavidMayar/flink/commits/HITS) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1956#discussion_r62838798 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; + +import org.apache.flink.api.common.aggregators.DoubleSumAggregator; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.ScatterGatherConfiguration; +import org.apache.flink.graph.spargel.VertexUpdateFunction; +import org.apache.flink.graph.utils.NullValueEdgeMapper; +import org.apache.flink.types.DoubleValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Preconditions; + +/** + * This is an implementation of HITS algorithm, using a scatter-gather iteration. + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters, + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority + * represented a page that was linked by many different hubs. The implementation assumes that the two value on + * every vertex are the same at the beginning. + * + * + * @see https://en.wikipedia.org/wiki/HITS_algorithm;>HITS Algorithm + */ --- End diff -- Can you also please add a comment about the result type? Which tuple field is the authority score and which is the hub? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1956#discussion_r62838726 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; + +import org.apache.flink.api.common.aggregators.DoubleSumAggregator; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.ScatterGatherConfiguration; +import org.apache.flink.graph.spargel.VertexUpdateFunction; +import org.apache.flink.graph.utils.NullValueEdgeMapper; +import org.apache.flink.types.DoubleValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Preconditions; + +/** + * This is an implementation of HITS algorithm, using a scatter-gather iteration. + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters, + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority + * represented a page that was linked by many different hubs. The implementation assumes that the two value on --- End diff -- represented => represents pointed => points was linked => is linked the two value => the two values* --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2044) Implementation of Gelly HITS Algorithm
[ https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280037#comment-15280037 ] ASF GitHub Bot commented on FLINK-2044: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1956#discussion_r62837591 --- Diff: flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/HITSData.java --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples.data; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.NullValue; + +import java.util.ArrayList; +import java.util.List; + +/** + * Provides the default data set used for the HITS test program. + * If no parameters are given to the program, the default edge data set is used. --- End diff -- I guess you copied this comment from another similar class. The "If no parameters given..." refers to Gelly examples, which run with default data if no parameters are provided. In this case HITS is implemented as a library method, so this comment can be removed. This data is only used for testing as far as I can tell :) > Implementation of Gelly HITS Algorithm > -- > > Key: FLINK-2044 > URL: https://issues.apache.org/jira/browse/FLINK-2044 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Ahamd Javid >Assignee: GaoLun >Priority: Minor > > Implementation of Hits Algorithm in Gelly API using Java. the feature branch > can be found here: (https://github.com/JavidMayar/flink/commits/HITS) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1956#discussion_r62837591 --- Diff: flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/HITSData.java --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples.data; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.NullValue; + +import java.util.ArrayList; +import java.util.List; + +/** + * Provides the default data set used for the HITS test program. + * If no parameters are given to the program, the default edge data set is used. --- End diff -- I guess you copied this comment from another similar class. The "If no parameters given..." refers to Gelly examples, which run with default data if no parameters are provided. In this case HITS is implemented as a library method, so this comment can be removed. This data is only used for testing as far as I can tell :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280018#comment-15280018 ] Dawid Wysakowicz commented on FLINK-3848: - Sure I will have a look into current state and wait for more info, no hurry. > Add ProjectableTableSource interface and translation rule > - > > Key: FLINK-3848 > URL: https://issues.apache.org/jira/browse/FLINK-3848 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation > that support projection push-down. > The interface could look as follows > {code} > def trait ProjectableTableSource { > def setProjection(fields: Array[String]): Unit > } > {code} > In addition we need Calcite rules to push a projection into a TableScan that > refers to a {{ProjectableTableSource}}. We might need to tweak the cost model > as well to push the optimizer in the right direction. > Moreover, the {{CsvTableSource}} could be extended to implement > {{ProjectableTableSource}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Flink 3750 fixed
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1941#issuecomment-218438515 Thanks for the update @fpompermaier. Overall the PR looks good. I added a few comments and suggestions. Thanks, Fabian --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Flink 3750 fixed
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1941#discussion_r62834635 --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/example/JDBCFullTest.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc.example; + +import java.sql.Types; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.jdbc.JDBCInputFormat; +import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder; +import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat; +import org.apache.flink.api.java.io.jdbc.JDBCTestBase; +import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.junit.Test; + +public class JDBCFullTest extends JDBCTestBase { + + @Test + public void test() throws Exception { --- End diff -- This test does not assert the correctness of the data. It just checks that the code runs but not whether it is correct. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Flink 3750 fixed
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1941#discussion_r62834552 --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java --- @@ -19,135 +19,42 @@ package org.apache.flink.api.java.io.jdbc; import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; import java.sql.ResultSet; -import org.junit.Assert; - import org.apache.flink.api.java.tuple.Tuple5; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.apache.flink.api.table.Row; +import org.junit.Assert; import org.junit.Test; -public class JDBCOutputFormatTest { - private JDBCInputFormat jdbcInputFormat; - private JDBCOutputFormat jdbcOutputFormat; - - private static Connection conn; - - static final Object[][] dbData = { - {1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11}, - {1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22}, - {1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33}, - {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44}, - {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}}; - - @BeforeClass - public static void setUpClass() throws SQLException { - try { - System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.io.jdbc.DerbyUtil.DEV_NULL"); - prepareDerbyDatabase(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - Assert.fail(); - } - } - - private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException { - String dbURL = "jdbc:derby:memory:ebookshop;create=true"; - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - conn = DriverManager.getConnection(dbURL); - createTable("books"); - createTable("newbooks"); - insertDataToSQLTables(); - conn.close(); - } - - private static void createTable(String tableName) throws SQLException { - StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE "); - sqlQueryBuilder.append(tableName); - sqlQueryBuilder.append(" ("); - sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); - sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); - sqlQueryBuilder.append("qty INT DEFAULT NULL,"); - sqlQueryBuilder.append("PRIMARY KEY (id))"); - - Statement stat = conn.createStatement(); - stat.executeUpdate(sqlQueryBuilder.toString()); - stat.close(); - } - - private static void insertDataToSQLTables() throws SQLException { - StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES "); - sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),"); - sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),"); - sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),"); - sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),"); - sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)"); - - Statement stat = conn.createStatement(); - stat.execute(sqlQueryBuilder.toString()); - stat.close(); - } - - @AfterClass - public static void tearDownClass() { - cleanUpDerbyDatabases(); - } - - private static void cleanUpDerbyDatabases() { - try { - String dbURL = "jdbc:derby:memory:ebookshop;create=true"; - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - - conn = DriverManager.getConnection(dbURL); - Statement stat = conn.createStatement(); - stat.executeUpdate("DROP TABLE books"); - stat.executeUpdate("DROP TABLE newbooks"); - stat.close(); - conn.close(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); - } - } - - @After - public
[GitHub] flink pull request: Flink 3750 fixed
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1941#discussion_r62834121 --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java --- @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io.jdbc; + +import java.io.IOException; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; + +/** + * Base test class for JDBC Input and Output formats + */ +public class JDBCTestBase { + + public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver"; + public static final String DB_URL = "jdbc:derby:memory:ebookshop"; + public static final String INPUT_TABLE = "books"; + public static final String OUTPUT_TABLE = "newbooks"; + public static final String SELECT_ALL_BOOKS = "select * from " + INPUT_TABLE; + public static final String SELECT_ALL_NEWBOOKS = "select * from " + OUTPUT_TABLE; + public static final String SELECT_EMPTY = "select * from books WHERE QTY < 0"; + public static final String INSERT_TEMPLATE = "insert into %s (id, title, author, price, qty) values (?,?,?,?,?)"; + public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?"; + public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE author = ?"; + + protected JDBCInputFormat jdbcInputFormat; + protected JDBCOutputFormat jdbcOutputFormat; + + protected static Connection conn; + + public static final Object[][] testData = { --- End diff -- Add one or two records with `null` values to the test data. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Flink 3750 fixed
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1941#discussion_r62833701 --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java --- @@ -19,180 +19,224 @@ package org.apache.flink.api.java.io.jdbc; import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; import java.sql.ResultSet; - +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider; +import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider; +import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.api.table.Row; +import org.apache.flink.core.io.InputSplit; import org.junit.Assert; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple5; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; -public class JDBCInputFormatTest { - JDBCInputFormat jdbcInputFormat; - - static Connection conn; - - static final Object[][] dbData = { - {1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11}, - {1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22}, - {1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33}, - {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44}, - {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}}; - - @BeforeClass - public static void setUpClass() { - try { - prepareDerbyDatabase(); - } catch (Exception e) { - Assert.fail(); - } - } - - private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException { - System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.io.jdbc.DerbyUtil.DEV_NULL"); - String dbURL = "jdbc:derby:memory:ebookshop;create=true"; - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - conn = DriverManager.getConnection(dbURL); - createTable(); - insertDataToSQLTable(); - conn.close(); - } - - private static void createTable() throws SQLException { - StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books ("); - sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); - sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); - sqlQueryBuilder.append("qty INT DEFAULT NULL,"); - sqlQueryBuilder.append("PRIMARY KEY (id))"); - - Statement stat = conn.createStatement(); - stat.executeUpdate(sqlQueryBuilder.toString()); - stat.close(); - } - - private static void insertDataToSQLTable() throws SQLException { - StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES "); - sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),"); - sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),"); - sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),"); - sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),"); - sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)"); - - Statement stat = conn.createStatement(); - stat.execute(sqlQueryBuilder.toString()); - stat.close(); - } - - @AfterClass - public static void tearDownClass() { - cleanUpDerbyDatabases(); - } - - private static void cleanUpDerbyDatabases() { - try { - String dbURL = "jdbc:derby:memory:ebookshop;create=true"; - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - - conn = DriverManager.getConnection(dbURL); - Statement stat = conn.createStatement(); - stat.executeUpdate("DROP TABLE books"); - stat.close(); - conn.close(); - } catch (Exception e) { - e.printStackTrace(); -
[GitHub] flink pull request: Flink 3750 fixed
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1941#discussion_r62833691 --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java --- @@ -19,180 +19,224 @@ package org.apache.flink.api.java.io.jdbc; import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; import java.sql.ResultSet; - +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider; +import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider; +import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.api.table.Row; +import org.apache.flink.core.io.InputSplit; import org.junit.Assert; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple5; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; -public class JDBCInputFormatTest { - JDBCInputFormat jdbcInputFormat; - - static Connection conn; - - static final Object[][] dbData = { - {1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11}, - {1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22}, - {1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33}, - {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44}, - {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}}; - - @BeforeClass - public static void setUpClass() { - try { - prepareDerbyDatabase(); - } catch (Exception e) { - Assert.fail(); - } - } - - private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException { - System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.io.jdbc.DerbyUtil.DEV_NULL"); - String dbURL = "jdbc:derby:memory:ebookshop;create=true"; - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - conn = DriverManager.getConnection(dbURL); - createTable(); - insertDataToSQLTable(); - conn.close(); - } - - private static void createTable() throws SQLException { - StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books ("); - sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); - sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); - sqlQueryBuilder.append("qty INT DEFAULT NULL,"); - sqlQueryBuilder.append("PRIMARY KEY (id))"); - - Statement stat = conn.createStatement(); - stat.executeUpdate(sqlQueryBuilder.toString()); - stat.close(); - } - - private static void insertDataToSQLTable() throws SQLException { - StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES "); - sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),"); - sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),"); - sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),"); - sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),"); - sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)"); - - Statement stat = conn.createStatement(); - stat.execute(sqlQueryBuilder.toString()); - stat.close(); - } - - @AfterClass - public static void tearDownClass() { - cleanUpDerbyDatabases(); - } - - private static void cleanUpDerbyDatabases() { - try { - String dbURL = "jdbc:derby:memory:ebookshop;create=true"; - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - - conn = DriverManager.getConnection(dbURL); - Statement stat = conn.createStatement(); - stat.executeUpdate("DROP TABLE books"); - stat.close(); - conn.close(); - } catch (Exception e) { - e.printStackTrace(); -
[GitHub] flink pull request: Flink 3750 fixed
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1941#discussion_r62833637 --- Diff: flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java --- @@ -19,180 +19,224 @@ package org.apache.flink.api.java.io.jdbc; import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; import java.sql.ResultSet; - +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider; +import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider; +import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.api.table.Row; +import org.apache.flink.core.io.InputSplit; import org.junit.Assert; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple5; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; -public class JDBCInputFormatTest { - JDBCInputFormat jdbcInputFormat; - - static Connection conn; - - static final Object[][] dbData = { - {1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11}, - {1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22}, - {1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33}, - {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44}, - {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}}; - - @BeforeClass - public static void setUpClass() { - try { - prepareDerbyDatabase(); - } catch (Exception e) { - Assert.fail(); - } - } - - private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException { - System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.io.jdbc.DerbyUtil.DEV_NULL"); - String dbURL = "jdbc:derby:memory:ebookshop;create=true"; - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - conn = DriverManager.getConnection(dbURL); - createTable(); - insertDataToSQLTable(); - conn.close(); - } - - private static void createTable() throws SQLException { - StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books ("); - sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); - sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); - sqlQueryBuilder.append("qty INT DEFAULT NULL,"); - sqlQueryBuilder.append("PRIMARY KEY (id))"); - - Statement stat = conn.createStatement(); - stat.executeUpdate(sqlQueryBuilder.toString()); - stat.close(); - } - - private static void insertDataToSQLTable() throws SQLException { - StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES "); - sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),"); - sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),"); - sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),"); - sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),"); - sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)"); - - Statement stat = conn.createStatement(); - stat.execute(sqlQueryBuilder.toString()); - stat.close(); - } - - @AfterClass - public static void tearDownClass() { - cleanUpDerbyDatabases(); - } - - private static void cleanUpDerbyDatabases() { - try { - String dbURL = "jdbc:derby:memory:ebookshop;create=true"; - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - - conn = DriverManager.getConnection(dbURL); - Statement stat = conn.createStatement(); - stat.executeUpdate("DROP TABLE books"); - stat.close(); - conn.close(); - } catch (Exception e) { - e.printStackTrace(); -
[GitHub] flink pull request: Flink 3750 fixed
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1941#discussion_r62833518 --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java --- @@ -15,17 +15,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.api.java.io.jdbc; +package org.apache.flink.api.java.io.jdbc.split; -import java.io.OutputStream; +import java.io.Serializable; + +import org.apache.flink.api.java.io.jdbc.JDBCInputFormat; -@SuppressWarnings("unused") /** - * Utility class to disable derby logging - */ -public class DerbyUtil { - public static final OutputStream DEV_NULL = new OutputStream() { - public void write(int b) { - } - }; + * + * This interface is used by the {@link JDBCInputFormat} to compute the list of parallel query to run (i.e. splits). + * Each query will be parameterized using a row of the matrix provided by each {@link ParameterValuesProvider} implementation + * + * */ +public interface ParameterValuesProvider extends Serializable { --- End diff -- The `ParameterValuesProvider` does not need to be `Serializable`. However, all values in the parameter values array must be, otherwise the JdbcInputFormat cannot be serialized. So the signature of `getParameterValues()` should be adapted to `public Serializable[][] getParameterValues()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Flink 3750 fixed
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1941#discussion_r62833327 --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java --- @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io.jdbc.split; + +/** + * + * This query generator assumes that the query to parameterize contains a BETWEEN constraint on a numeric column. + * The generated query set will be of size equal to the configured fetchSize (apart the last one range), + * ranging from the min value up to the max. + * + * For example, if there's a table BOOKS with a numeric PK id, using a query like: + * + * SELECT * FROM BOOKS WHERE id BETWEEN ? AND ? + * + * + * you can use this class to automatically generate the parameters of the BETWEEN clause, + * based on the passed constructor parameters. + * + * */ +public class NumericBetweenParametersProvider implements ParameterValuesProvider { + + private static final long serialVersionUID = 1L; + private long fetchSize; + private final long min; + private final long max; + + public NumericBetweenParametersProvider(long fetchSize, long min, long max) { + this.fetchSize = fetchSize; + this.min = min; + this.max = max; + } + + @Override + public Object[][] getParameterValues(){ + double maxEelemCount = (max - min) + 1; --- End diff -- typo in var name --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Flink 3750 fixed
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1941#discussion_r62833230 --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java --- @@ -95,32 +95,32 @@ private void establishConnection() throws SQLException, ClassNotFoundException { } } - private enum SupportedTypes { - BOOLEAN, - BYTE, - SHORT, - INTEGER, - LONG, - STRING, - FLOAT, - DOUBLE - } - /** * Adds a record to the prepared statement. * * When this method is called, the output format is guaranteed to be opened. +* +* WARNING: this may fail if the JDBC driver doesn't handle null correctly and no column types specified in the SqlRow * * @param tuple The records to add to the output. * @throws IOException Thrown, if the records could not be added due to an I/O problem. */ @Override - public void writeRecord(OUT tuple) throws IOException { + public void writeRecord(Row tuple) throws IOException { try { - if (types == null) { - extractTypes(tuple); + for (int index = 0; index < tuple.productArity(); index++) { + if (tuple.productElement(index) == null && typesArray != null && typesArray.length > 0) { + if (typesArray.length == tuple.productArity()) { + upload.setNull(index + 1, typesArray[index]); + } else { + LOG.warn("Column SQL types array doesn't match arity of SqlRow! Check the passed array..."); + } + } else { + //try generic set if no column type available + //WARNING: this may fail if the JDBC driver doesn't handle null correctly + upload.setObject(index + 1, tuple.productElement(index)); --- End diff -- Are there any drawbacks of using the generic `setObject()` method compared to the typed methods? If yes, we should use them if the types are specified. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Flink 3750 fixed
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1941#discussion_r62833149 --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java --- @@ -95,32 +95,32 @@ private void establishConnection() throws SQLException, ClassNotFoundException { } } - private enum SupportedTypes { - BOOLEAN, - BYTE, - SHORT, - INTEGER, - LONG, - STRING, - FLOAT, - DOUBLE - } - /** * Adds a record to the prepared statement. * * When this method is called, the output format is guaranteed to be opened. +* +* WARNING: this may fail if the JDBC driver doesn't handle null correctly and no column types specified in the SqlRow * * @param tuple The records to add to the output. * @throws IOException Thrown, if the records could not be added due to an I/O problem. */ @Override - public void writeRecord(OUT tuple) throws IOException { + public void writeRecord(Row tuple) throws IOException { try { - if (types == null) { - extractTypes(tuple); + for (int index = 0; index < tuple.productArity(); index++) { + if (tuple.productElement(index) == null && typesArray != null && typesArray.length > 0) { + if (typesArray.length == tuple.productArity()) { --- End diff -- Move this check out of the loop. We do not need to check this for every attribute. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Flink 3750 fixed
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1941#discussion_r62833050 --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java --- @@ -352,6 +318,10 @@ public JDBCInputFormatBuilder setResultSetConcurrency(int resultSetConcurrency) format.resultSetConcurrency = resultSetConcurrency; return this; } + public JDBCInputFormatBuilder setParametersProvider(ParameterValuesProvider parameterValuesProvider) { --- End diff -- Add new line above --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Flink 3750 fixed
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1941#discussion_r62833018 --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java --- @@ -314,6 +279,7 @@ public static JDBCInputFormatBuilder buildJDBCInputFormat() { public JDBCInputFormatBuilder() { this.format = new JDBCInputFormat(); + //use the "Firehose cursor" (see http://jtds.sourceforge.net/resultSets.html) --- End diff -- Can you explain what this comment is about? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Flink 3750 fixed
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1941#discussion_r62832955 --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java --- @@ -157,14 +216,25 @@ public boolean reachedEnd() throws IOException { * @throws java.io.IOException */ @Override - public OUT nextRecord(OUT tuple) throws IOException { + public Row nextRecord(Row row) throws IOException { try { - resultSet.next(); - if (columnTypes == null) { - extractTypes(tuple); + hasNext = resultSet.next(); + if (!hasNext) { + return null; + } + try { + //This throws a NPE when the TypeInfo is not passed to the InputFormat, + //i.e. KryoSerializer used to generate the passed row + row.productArity(); + } catch(NullPointerException npe) { + ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); + row = new Row(resultSetMetaData.getColumnCount()); + LOG.warn("TypeInfo not provided to the InputFormat. Row cannot be reused."); } - addValue(tuple); - return tuple; + for (int pos = 0; pos < row.productArity(); pos++) { + row.setField(pos, resultSet.getObject(pos + 1)); + } + return row; } catch (SQLException se) { close(); --- End diff -- Let the data source task call `close()` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Flink 3750 fixed
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1941#discussion_r62832846 --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java --- @@ -157,14 +216,25 @@ public boolean reachedEnd() throws IOException { * @throws java.io.IOException */ @Override - public OUT nextRecord(OUT tuple) throws IOException { + public Row nextRecord(Row row) throws IOException { try { - resultSet.next(); - if (columnTypes == null) { - extractTypes(tuple); + hasNext = resultSet.next(); + if (!hasNext) { + return null; + } + try { + //This throws a NPE when the TypeInfo is not passed to the InputFormat, + //i.e. KryoSerializer used to generate the passed row + row.productArity(); --- End diff -- Why not check `row == null` instead of putting logic into the `catch` branch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Flink 3750 fixed
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1941#discussion_r62832789 --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java --- @@ -157,14 +216,25 @@ public boolean reachedEnd() throws IOException { * @throws java.io.IOException */ @Override - public OUT nextRecord(OUT tuple) throws IOException { + public Row nextRecord(Row row) throws IOException { try { - resultSet.next(); - if (columnTypes == null) { - extractTypes(tuple); + hasNext = resultSet.next(); --- End diff -- If `resultSet.next()` was called at the end of `open()` we should call it again at the end of `nextRecord()` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Flink 3750 fixed
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1941#discussion_r62832683 --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java --- @@ -113,19 +192,7 @@ public void close() throws IOException { try { resultSet.close(); --- End diff -- check `resultSet == null` instead of catching NPE? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Flink 3750 fixed
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1941#discussion_r62832699 --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java --- @@ -113,19 +192,7 @@ public void close() throws IOException { try { resultSet.close(); } catch (SQLException se) { - LOG.info("Inputformat couldn't be closed - " + se.getMessage()); - } catch (NullPointerException npe) { - } - try { - statement.close(); --- End diff -- null check --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Flink 3750 fixed
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1941#discussion_r62832588 --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java --- @@ -81,25 +134,51 @@ public void configure(Configuration parameters) { * @throws IOException */ @Override - public void open(InputSplit ignored) throws IOException { + public void open(InputSplit inputSplit) throws IOException { + hasNext = true; --- End diff -- Call `resultSet.next()` at the end of `open()` and only set `hasNext = true` if the split actually contains a record. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Flink 3750 fixed
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1941#discussion_r62832479 --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java --- @@ -81,25 +134,51 @@ public void configure(Configuration parameters) { * @throws IOException */ @Override - public void open(InputSplit ignored) throws IOException { + public void open(InputSplit inputSplit) throws IOException { + hasNext = true; try { - establishConnection(); - statement = dbConn.createStatement(resultSetType, resultSetConcurrency); - resultSet = statement.executeQuery(query); + if (inputSplit != null && parameterValues != null) { + for (int i = 0; i < parameterValues[inputSplit.getSplitNumber()].length; i++) { + Object param = parameterValues[inputSplit.getSplitNumber()][i]; + if (param instanceof String) { + statement.setString(i + 1, (String) param); + } else if (param instanceof Long) { + statement.setLong(i + 1, (Long) param); + } else if (param instanceof Integer) { + statement.setInt(i + 1, (Integer) param); + } else if (param instanceof Double) { + statement.setDouble(i + 1, (Double) param); + } else if (param instanceof Boolean) { + statement.setBoolean(i + 1, (Boolean) param); + } else if (param instanceof Float) { + statement.setFloat(i + 1, (Float) param); + } else if (param instanceof BigDecimal) { + statement.setBigDecimal(i + 1, (BigDecimal) param); + } else if (param instanceof Byte) { + statement.setByte(i + 1, (Byte) param); + } else if (param instanceof Short) { + statement.setShort(i + 1, (Short) param); + } else if (param instanceof Date) { + statement.setDate(i + 1, (Date) param); + } else if (param instanceof Time) { + statement.setTime(i + 1, (Time) param); + } else if (param instanceof Timestamp) { + statement.setTimestamp(i + 1, (Timestamp) param); + } else if (param instanceof Array) { + statement.setArray(i + 1, (Array) param); + } else { + //extends with other types if needed + throw new IllegalArgumentException("open() failed. Parameter " + i + " of type " + param.getClass() + " is not handled (yet)." ); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Executing '%s' with parameters %s", queryTemplate, Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()]))); + } + } + resultSet = statement.executeQuery(); } catch (SQLException se) { close(); --- End diff -- No need to call `close()`. This should be done by the object that manages the IF's life cycle. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Flink 3750 fixed
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1941#discussion_r62832266 --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java --- @@ -19,59 +19,112 @@ package org.apache.flink.api.java.io.jdbc; import java.io.IOException; +import java.math.BigDecimal; +import java.sql.Array; import java.sql.Connection; +import java.sql.Date; import java.sql.DriverManager; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; -import java.sql.Statement; - -import org.apache.flink.api.common.io.NonParallelInput; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Arrays; import org.apache.flink.api.common.io.DefaultInputSplitAssigner; import org.apache.flink.api.common.io.RichInputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider; +import org.apache.flink.api.table.Row; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; -import org.apache.flink.types.NullValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * InputFormat to read data from a database and generate tuples. + * InputFormat to read data from a database and generate Rows. * The InputFormat has to be configured using the supplied InputFormatBuilder. * - * @param - * @see Tuple + * In order to query the JDBC source in parallel, you need to provide a parameterized + * query template (i.e. a valid {@link PreparedStatement}) and a {@link ParameterValuesProvider} + * which provides binding values for the query parameters. + * + * @see Row + * @see ParameterValuesProvider + * @see PreparedStatement * @see DriverManager */ -public class JDBCInputFormat extends RichInputFormatimplements NonParallelInput { - private static final long serialVersionUID = 1L; +public class JDBCInputFormat extends RichInputFormat { + private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class); private String username; private String password; private String drivername; private String dbURL; - private String query; + private String queryTemplate; private int resultSetType; private int resultSetConcurrency; private transient Connection dbConn; - private transient Statement statement; + private transient PreparedStatement statement; private transient ResultSet resultSet; - - private int[] columnTypes = null; - + + private boolean hasNext = true; + private Object[][] parameterValues; + public JDBCInputFormat() { } @Override public void configure(Configuration parameters) { + //do nothing here + } + + @Override + public void openInputFormat() { + //called once per inputFormat (on open) + try { + Class.forName(drivername); + if (username == null) { + dbConn = DriverManager.getConnection(dbURL); + } else { + dbConn = DriverManager.getConnection(dbURL, username, password); + } + statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency); + } catch (SQLException se) { + throw new IllegalArgumentException("open() failed." + se.getMessage(), se); + } catch (ClassNotFoundException cnfe) { + throw new IllegalArgumentException("JDBC-Class not found. - " + cnfe.getMessage(), cnfe); + } + } + + @Override + public void closeInputFormat() { + //called once per inputFormat (on close) + try { + statement.close(); + } catch (SQLException se) { + LOG.info("Inputformat Statement couldn't be closed - " + se.getMessage()); + } catch (NullPointerException npe) { --- End diff -- Check for `statement == null` instead of catching a NPE. --- If your project is set up for it, you can reply to this email and have
[GitHub] flink pull request: Flink 3750 fixed
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1941#discussion_r62832299 --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java --- @@ -19,59 +19,112 @@ package org.apache.flink.api.java.io.jdbc; import java.io.IOException; +import java.math.BigDecimal; +import java.sql.Array; import java.sql.Connection; +import java.sql.Date; import java.sql.DriverManager; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; -import java.sql.Statement; - -import org.apache.flink.api.common.io.NonParallelInput; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Arrays; import org.apache.flink.api.common.io.DefaultInputSplitAssigner; import org.apache.flink.api.common.io.RichInputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider; +import org.apache.flink.api.table.Row; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; -import org.apache.flink.types.NullValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * InputFormat to read data from a database and generate tuples. + * InputFormat to read data from a database and generate Rows. * The InputFormat has to be configured using the supplied InputFormatBuilder. * - * @param - * @see Tuple + * In order to query the JDBC source in parallel, you need to provide a parameterized + * query template (i.e. a valid {@link PreparedStatement}) and a {@link ParameterValuesProvider} + * which provides binding values for the query parameters. + * + * @see Row + * @see ParameterValuesProvider + * @see PreparedStatement * @see DriverManager */ -public class JDBCInputFormat extends RichInputFormatimplements NonParallelInput { - private static final long serialVersionUID = 1L; +public class JDBCInputFormat extends RichInputFormat { + private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class); private String username; private String password; private String drivername; private String dbURL; - private String query; + private String queryTemplate; private int resultSetType; private int resultSetConcurrency; private transient Connection dbConn; - private transient Statement statement; + private transient PreparedStatement statement; private transient ResultSet resultSet; - - private int[] columnTypes = null; - + + private boolean hasNext = true; + private Object[][] parameterValues; + public JDBCInputFormat() { } @Override public void configure(Configuration parameters) { + //do nothing here + } + + @Override + public void openInputFormat() { + //called once per inputFormat (on open) + try { + Class.forName(drivername); + if (username == null) { + dbConn = DriverManager.getConnection(dbURL); + } else { + dbConn = DriverManager.getConnection(dbURL, username, password); + } + statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency); + } catch (SQLException se) { + throw new IllegalArgumentException("open() failed." + se.getMessage(), se); + } catch (ClassNotFoundException cnfe) { + throw new IllegalArgumentException("JDBC-Class not found. - " + cnfe.getMessage(), cnfe); + } + } + + @Override + public void closeInputFormat() { + //called once per inputFormat (on close) + try { + statement.close(); + } catch (SQLException se) { + LOG.info("Inputformat Statement couldn't be closed - " + se.getMessage()); + } catch (NullPointerException npe) { + } finally { + statement = null; + } + + try { +
[GitHub] flink pull request: Flink 3750 fixed
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1941#discussion_r62832218 --- Diff: flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java --- @@ -81,25 +134,51 @@ public void configure(Configuration parameters) { * @throws IOException */ @Override - public void open(InputSplit ignored) throws IOException { + public void open(InputSplit inputSplit) throws IOException { --- End diff -- JavaDocs need to be updated for new parameter name --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---