[GitHub] flink pull request: [FLINK-1979] Lossfunctions

2016-05-11 Thread skavulya
Github user skavulya commented on the pull request:

https://github.com/apache/flink/pull/656#issuecomment-218651924
  
thanks @thvasilo! I created the PR https://github.com/apache/flink/pull/1985


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and...

2016-05-11 Thread skavulya
GitHub user skavulya opened a pull request:

https://github.com/apache/flink/pull/1985

[FLINK-1979] Add logistic loss, hinge loss and regularization penalties 

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

…zation

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/skavulya/flink loss-functions2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1985.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1985


commit 014108ac9ea2e46ac021e9bf3824624d54357f74
Author: spkavuly 
Date:   2016-05-11T22:55:13Z

Add logistic loss, hinge loss and regularization penalties for optimization




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3768] [gelly] Clustering Coefficient

2016-05-11 Thread hsaputra
Github user hsaputra commented on the pull request:

https://github.com/apache/flink/pull/1896#issuecomment-218637517
  
Thanks for the docs update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

2016-05-11 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/1984

[FLINK-3889] Make File Monitoring Function checkpointable.

This pull request introduces the underlying functionality to make Streaming 
File Sources persistent. 
It does not yet change the API calls, as this will be done after agreeing 
on the current architecture and 
implementation.

In addition, this PR includes a commit for FLINK-3896. This allows an 
operator to cancel its container task. The need for this functionality came 
during a discussion with @StephanEwen and @aljoscha and it is a separate commit.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink ft_files

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1984.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1984


commit 7deb92236cec47ddcfbb3abfa396fd9d15f770b7
Author: kl0u 
Date:   2016-05-10T16:56:58Z

[FLINK-3896] Allow a StreamTask to be Externally Cancelled

It adds a method failExternally() to the StreamTask, so that custom 
Operators
can make their containing task fail when needed.

commit c9682b7606451c4eecf6f2f6df9a498fb6d39577
Author: kl0u 
Date:   2016-04-10T14:56:42Z

[FLINK-3717] Make FileInputFormat checkpointable

This adds a new interface called CheckpointableInputFormat
which describes input formats whose state is queryable,
i.e. getCurrentChannelState() returns where the reader is
in the underlying source, and they can resume reading from
a user-specified position.

This functionality is not yet leveraged by current readers.

commit cbbfd8d7e6db0f8f114675b4047aecb94996e500
Author: kl0u 
Date:   2016-04-18T14:37:54Z

[FLINK-3889][FLINK-3808] Refactor File Monitoring Source

This is meant to replace the different file
reading sources in Flink streaming. Now there is
one monitoring source with DOP 1 monitoring a
directory and assigning input split to downstream
readers.

In addition, it makes the new features added by
FLINK-3717 and FLINK-3808 work together. Now we have
a file monitoring source that is also fault tolerant
and can guarantee exactly once semantics.

This does not replace the old API calls. This
will be done in a future commit.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-05-11 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280969#comment-15280969
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


[~rmetzger]
Hi Robert,
Can you tell me how you were testing the Kinesis connector in AWS EMR before? 
I'd like to try it myself too when I get back, also to learn the process :) 
Thanks.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3715) Move Accumulating/Discarding from Trigger to WindowOperator

2016-05-11 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-3715:
--
Description: 
As mentioned in 
https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#heading=h.pyk1sn8f33q2
 we should move the decision of whether to {{PURGE}} a window upon firing from 
the {{Trigger}} to the {{WindowOperator}}. This also requires to add API so 
that the user can specify whether windows should be purged upon trigger firing 
(discarding) or kept (accumulating).

As mentioned in the above doc, the {{Trigger}} can react with 4 results right 
now: {{CONTINUE}}, {{FIRE}}, {{PURGE}}, {{FIRE_AND_PURGE}}. The behavior of a 
trigger is not apparent if not looking at the code of the trigger, this has 
confused a number of users. With the new regime, a {{Trigger}} can just decide 
whether to {{CONTINUE}} or {{FIRE}}. The setting of accumulating/discarding 
decides whether to purge the window or keep it.

This depends on FLINK-3714 where we introduce an "allowed lateness" setting. 
Having a choice between accumulating and discarding only makes sense with an 
allowed lateness greater zero. Otherwise no late elements could ever arrive 
that would go into the kept windows.

  was:
As mentioned in 
https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#heading=h.pyk1sn8f33q2
 we should move the decision of whether to {{PURGE}} a window upon firing from 
the {{Trigger}} to the {{WindowOperato}}. This also requires to add API so that 
the user can specify whether windows should be purged upon trigger firing 
(discarding) or kept (accumulating).

As mentioned in the above doc, the {{Trigger}} can react with 4 results right 
now: {{CONTINUE}}, {{FIRE}}, {{PURGE}}, {{FIRE_AND_PURGE}}. The behavior of a 
trigger is not apparent if not looking at the code of the trigger, this has 
confused a number of users. With the new regime, a {{Trigger}} can just decide 
whether to {{CONTINUE}} or {{FIRE}}. The setting of accumulating/discarding 
decides whether to purge the window or keep it.

This depends on FLINK-3714 where we introduce an "allowed lateness" setting. 
Having a choice between accumulating and discarding only makes sense with an 
allowed lateness greater zero. Otherwise no late elements could ever arrive 
that would go into the kept windows.


> Move Accumulating/Discarding from Trigger to WindowOperator
> ---
>
> Key: FLINK-3715
> URL: https://issues.apache.org/jira/browse/FLINK-3715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Aljoscha Krettek
>
> As mentioned in 
> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#heading=h.pyk1sn8f33q2
>  we should move the decision of whether to {{PURGE}} a window upon firing 
> from the {{Trigger}} to the {{WindowOperator}}. This also requires to add API 
> so that the user can specify whether windows should be purged upon trigger 
> firing (discarding) or kept (accumulating).
> As mentioned in the above doc, the {{Trigger}} can react with 4 results right 
> now: {{CONTINUE}}, {{FIRE}}, {{PURGE}}, {{FIRE_AND_PURGE}}. The behavior of a 
> trigger is not apparent if not looking at the code of the trigger, this has 
> confused a number of users. With the new regime, a {{Trigger}} can just 
> decide whether to {{CONTINUE}} or {{FIRE}}. The setting of 
> accumulating/discarding decides whether to purge the window or keep it.
> This depends on FLINK-3714 where we introduce an "allowed lateness" setting. 
> Having a choice between accumulating and discarding only makes sense with an 
> allowed lateness greater zero. Otherwise no late elements could ever arrive 
> that would go into the kept windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3879) Native implementation of HITS algorithm

2016-05-11 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280826#comment-15280826
 ] 

Greg Hogan commented on FLINK-3879:
---

The current expectation for FLINK-2044 is that the user will have some 
intuition on how many iterations to run. If there is a specific best value then 
it should be hard-coded in the program; otherwise, we should provide guidance 
on how to select the number of iterations. I do not see discussion of a 
convergence threshold in the HITS paper, but borrowing this aspect of PageRank 
seems appropriate. A threshold of 1E-9 requires that the average score is 
changing by less than one part in a billion. That seems easier to conceptualize 
than setting the number of iterations.

I think a benchmark is always good for context when comparing implementations.

When does Scatter-Gather outperform GSA? I see that some algorithms cannot be 
implemented using GSA, but that is clearly not the case for these algorithms.

The only delta iteration I am familiar with is Stephan's implementation of 
PageRank. Are you thinking of a similar modification to HITS? It should be 
straightforward to implement this and compare techniques for HITS and PageRank. 
Convergence should not be affected, but vertex scores below a given threshold 
will stop propagating deltas.
  https://gist.github.com/StephanEwen/2b1a4c9812ac46abc8f0

> Native implementation of HITS algorithm
> ---
>
> Key: FLINK-3879
> URL: https://issues.apache.org/jira/browse/FLINK-3879
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Hyperlink-Induced Topic Search (HITS, also "hubs and authorities") is 
> presented in [0] and described in [1].
> "[HITS] is a very popular and effective algorithm to rank documents based on 
> the link information among a set of documents. The algorithm presumes that a 
> good hub is a document that points to many others, and a good authority is a 
> document that many documents point to." 
> [https://pdfs.semanticscholar.org/a8d7/c7a4c53a9102c4239356f9072ec62ca5e62f.pdf]
> This implementation differs from FLINK-2044 by providing for convergence, 
> outputting both hub and authority scores, and completing in half the number 
> of iterations.
> [0] http://www.cs.cornell.edu/home/kleinber/auth.pdf
> [1] https://en.wikipedia.org/wiki/HITS_algorithm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

2016-05-11 Thread gallenvara
Github user gallenvara commented on the pull request:

https://github.com/apache/flink/pull/1956#issuecomment-218543446
  
@greghogan thanks and relevant codes have been modified. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2044) Implementation of Gelly HITS Algorithm

2016-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280541#comment-15280541
 ] 

ASF GitHub Bot commented on FLINK-2044:
---

Github user gallenvara commented on the pull request:

https://github.com/apache/flink/pull/1956#issuecomment-218543446
  
@greghogan thanks and relevant codes have been modified. :)


> Implementation of Gelly HITS Algorithm
> --
>
> Key: FLINK-2044
> URL: https://issues.apache.org/jira/browse/FLINK-2044
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Ahamd Javid
>Assignee: GaoLun
>Priority: Minor
>
> Implementation of Hits Algorithm in Gelly API using Java. the feature branch 
> can be found here: (https://github.com/JavidMayar/flink/commits/HITS)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3868) Specialized CopyableValue serializers and comparators

2016-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280496#comment-15280496
 ] 

ASF GitHub Bot commented on FLINK-3868:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/1983

[FLINK-3868] [core] Specialized CopyableValue serializers and comparators

Update ValueTypeInfo to use specialized serializers and comparators, many 
of which were already present.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
3868_specialized_copyablevalue_serializers_and_comparators

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1983.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1983


commit c3359933b67f4488e1ed7cd4f2632ee21cdb548e
Author: Greg Hogan 
Date:   2016-05-04T20:56:16Z

[FLINK-3868] [core] Specialized CopyableValue serializers and comparators

Update ValueTypeInfo to use specialized serializers and comparators,
many of which were already present.




> Specialized CopyableValue serializers and comparators
> -
>
> Key: FLINK-3868
> URL: https://issues.apache.org/jira/browse/FLINK-3868
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> This need was discussed on the mailing list [1] and will be obviated by code 
> generation for POJO serializers and comparators [2] (as I understand, i.e., 
> {{LongValue}} will now simply be treated as a POJO which happens to contain a 
> {{long}} and a specialized serializer and comparator will be generated).
> In the meantime, and since {{ValueTypeInfo}} will need to be reworked to use 
> the new generators, I think it is worthwhile to add specialized serializers 
> and comparators for the {{CopyableValue}} types.
> This will also provide another point of comparison for the performance of the 
> generated serializers and comparators.
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] https://issues.apache.org/jira/browse/FLINK-3599



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3868] [core] Specialized CopyableValue ...

2016-05-11 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/1983

[FLINK-3868] [core] Specialized CopyableValue serializers and comparators

Update ValueTypeInfo to use specialized serializers and comparators, many 
of which were already present.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
3868_specialized_copyablevalue_serializers_and_comparators

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1983.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1983


commit c3359933b67f4488e1ed7cd4f2632ee21cdb548e
Author: Greg Hogan 
Date:   2016-05-04T20:56:16Z

[FLINK-3868] [core] Specialized CopyableValue serializers and comparators

Update ValueTypeInfo to use specialized serializers and comparators,
many of which were already present.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-3868) Specialized CopyableValue serializers and comparators

2016-05-11 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-3868:
--
Fix Version/s: 1.1.0

> Specialized CopyableValue serializers and comparators
> -
>
> Key: FLINK-3868
> URL: https://issues.apache.org/jira/browse/FLINK-3868
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> This need was discussed on the mailing list [1] and will be obviated by code 
> generation for POJO serializers and comparators [2] (as I understand, i.e., 
> {{LongValue}} will now simply be treated as a POJO which happens to contain a 
> {{long}} and a specialized serializer and comparator will be generated).
> In the meantime, and since {{ValueTypeInfo}} will need to be reworked to use 
> the new generators, I think it is worthwhile to add specialized serializers 
> and comparators for the {{CopyableValue}} types.
> This will also provide another point of comparison for the performance of the 
> generated serializers and comparators.
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] https://issues.apache.org/jira/browse/FLINK-3599



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread yjshen
Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-218511092
  
@fhueske @twalthr thanks for the review work! I've updated the code just 
now.
Look forward to more feedbacks on this :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3836) Change Histogram to enable Long counters

2016-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280348#comment-15280348
 ] 

ASF GitHub Bot commented on FLINK-3836:
---

Github user mbode closed the pull request at:

https://github.com/apache/flink/pull/1966


> Change Histogram to enable Long counters
> 
>
> Key: FLINK-3836
> URL: https://issues.apache.org/jira/browse/FLINK-3836
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.2
>Reporter: Maximilian Bode
>Assignee: Maximilian Bode
>Priority: Minor
>
> Change 
> flink/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
>  to enable Long counts instead of Integer. In particular, change the TreeMap 
> to be .



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3836) Change Histogram to enable Long counters

2016-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280349#comment-15280349
 ] 

ASF GitHub Bot commented on FLINK-3836:
---

GitHub user mbode reopened a pull request:

https://github.com/apache/flink/pull/1966

[FLINK-3836] Add LongHistogram accumulator

New accumulator `LongHistogram`; the `Histogram` accumulator now throws an 
`IllegalArgumentException` instead of letting the int overflow. 

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mbode/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1966.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1966


commit f457319481701a1234c9ea7d29da24f857ae4241
Author: Maximilian Bode 
Date:   2016-04-27T15:19:16Z

[Flink-3836] Add LongHistogram accumulator




> Change Histogram to enable Long counters
> 
>
> Key: FLINK-3836
> URL: https://issues.apache.org/jira/browse/FLINK-3836
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.2
>Reporter: Maximilian Bode
>Assignee: Maximilian Bode
>Priority: Minor
>
> Change 
> flink/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
>  to enable Long counts instead of Integer. In particular, change the TreeMap 
> to be .



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3836] Add LongHistogram accumulator

2016-05-11 Thread mbode
GitHub user mbode reopened a pull request:

https://github.com/apache/flink/pull/1966

[FLINK-3836] Add LongHistogram accumulator

New accumulator `LongHistogram`; the `Histogram` accumulator now throws an 
`IllegalArgumentException` instead of letting the int overflow. 

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mbode/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1966.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1966


commit f457319481701a1234c9ea7d29da24f857ae4241
Author: Maximilian Bode 
Date:   2016-04-27T15:19:16Z

[Flink-3836] Add LongHistogram accumulator




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3836] Add LongHistogram accumulator

2016-05-11 Thread mbode
Github user mbode closed the pull request at:

https://github.com/apache/flink/pull/1966


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-2310) Add an Adamic-Adar Similarity example

2016-05-11 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan closed FLINK-2310.
-
Resolution: Later

Work on this algorithm is continuing in FLINK-3898.

> Add an Adamic-Adar Similarity example
> -
>
> Key: FLINK-2310
> URL: https://issues.apache.org/jira/browse/FLINK-2310
> Project: Flink
>  Issue Type: Task
>  Components: Gelly
>Reporter: Andra Lungu
>Assignee: Shivani Ghatge
>Priority: Minor
>
> Just as Jaccard, the Adamic-Adar algorithm measures the similarity between a 
> set of nodes. However, instead of counting the common neighbors and dividing 
> them by the total number of neighbors, the similarity is weighted according 
> to the vertex degrees. In particular, it's equal to log(1/numberOfEdges).
> The Adamic-Adar algorithm can be broken into three steps: 
> 1). For each vertex, compute the log of its inverse degrees (with the formula 
> above) and set it as the vertex value. 
> 2). Each vertex will then send this new computed value along with a list of 
> neighbors to the targets of its out-edges
> 3). Weigh the edges with the Adamic-Adar index: Sum over n from CN of 
> log(1/k_n)(CN is the set of all common neighbors of two vertices x, y. k_n is 
> the degree of node n). See [2]
> Prerequisites: 
> - Full understanding of the Jaccard Similarity Measure algorithm
> - Reading the associated literature: 
> [1] http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf
> [2] 
> http://stackoverflow.com/questions/22565620/fast-algorithm-to-compute-adamic-adar



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2044) Implementation of Gelly HITS Algorithm

2016-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280280#comment-15280280
 ] 

ASF GitHub Bot commented on FLINK-2044:
---

Github user gallenvara commented on a diff in the pull request:

https://github.com/apache/flink/pull/1956#discussion_r62868637
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java
 ---
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.NullValueEdgeMapper;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * This is an implementation of HITS algorithm, using a scatter-gather 
iteration.
+ * The user can define the maximum number of iterations. HITS algorithm is 
determined by two parameters,
+ * hubs and authorities. A good hub represents a page that points to many 
other pages, and a good authority
+ * represented a page that is linked by many different hubs.
+ * Each vertex has a value of Tuple2 type, the first field is hub score 
and the second field is authority score.
+ * The implementation assumes that the two score are the same in each 
vertex at the beginning.
+ * 
+ *
+ * @see https://en.wikipedia.org/wiki/HITS_algorithm;>HITS 
Algorithm
+ */
+public class HITSAlgorithm implements GraphAlgorithm 0, "The number of 
maximum iteration should be greater than 0.");
+   this.maxIterations = maxIterations * 2 + 1;
+   }
+
+   @Override
+   public DataSet>> run(Graph 
netGraph) throws Exception {
+
+   ScatterGatherConfiguration parameter = new 
ScatterGatherConfiguration();
+   parameter.setDirection(EdgeDirection.ALL);
+   parameter.registerAggregator("sumVertexValue", new 
DoubleSumAggregator());
+
+   return netGraph
+   .mapVertices(new VertexInitMapper())
+   .mapEdges(new NullValueEdgeMapper())
--- End diff --

Yes, and i will modify the code.


> Implementation of Gelly HITS Algorithm
> --
>
> Key: FLINK-2044
> URL: https://issues.apache.org/jira/browse/FLINK-2044
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Ahamd Javid
>Assignee: GaoLun
>Priority: Minor
>
> Implementation of Hits Algorithm in Gelly API using Java. the feature branch 
> can be found here: (https://github.com/JavidMayar/flink/commits/HITS)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

2016-05-11 Thread gallenvara
Github user gallenvara commented on a diff in the pull request:

https://github.com/apache/flink/pull/1956#discussion_r62868637
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java
 ---
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.NullValueEdgeMapper;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * This is an implementation of HITS algorithm, using a scatter-gather 
iteration.
+ * The user can define the maximum number of iterations. HITS algorithm is 
determined by two parameters,
+ * hubs and authorities. A good hub represents a page that points to many 
other pages, and a good authority
+ * represented a page that is linked by many different hubs.
+ * Each vertex has a value of Tuple2 type, the first field is hub score 
and the second field is authority score.
+ * The implementation assumes that the two score are the same in each 
vertex at the beginning.
+ * 
+ *
+ * @see https://en.wikipedia.org/wiki/HITS_algorithm;>HITS 
Algorithm
+ */
+public class HITSAlgorithm implements GraphAlgorithm 0, "The number of 
maximum iteration should be greater than 0.");
+   this.maxIterations = maxIterations * 2 + 1;
+   }
+
+   @Override
+   public DataSet>> run(Graph 
netGraph) throws Exception {
+
+   ScatterGatherConfiguration parameter = new 
ScatterGatherConfiguration();
+   parameter.setDirection(EdgeDirection.ALL);
+   parameter.registerAggregator("sumVertexValue", new 
DoubleSumAggregator());
+
+   return netGraph
+   .mapVertices(new VertexInitMapper())
+   .mapEdges(new NullValueEdgeMapper())
--- End diff --

Yes, and i will modify the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-2375) Add Approximate Adamic Adar Similarity method using BloomFilters

2016-05-11 Thread Vasia Kalavri (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vasia Kalavri closed FLINK-2375.

Resolution: Won't Fix

This technique can be considered as part of FLINK-3898.

> Add Approximate Adamic Adar Similarity method using BloomFilters
> 
>
> Key: FLINK-2375
> URL: https://issues.apache.org/jira/browse/FLINK-2375
> Project: Flink
>  Issue Type: Task
>  Components: Gelly
>Reporter: Shivani Ghatge
>Assignee: Shivani Ghatge
>Priority: Minor
>
> Just as Jaccard, the Adamic-Adar algorithm measures the similarity between a 
> set of nodes. However, instead of counting the common neighbors and dividing 
> them by the total number of neighbors, the similarity is weighted according 
> to the vertex degrees. In particular, it's equal to log(1/numberOfEdges).
> The Adamic-Adar algorithm can be broken into three steps:
> 1). For each vertex, compute the log of its inverse degrees (with the formula 
> above) and set it as the vertex value.
> 2). Each vertex will then send this new computed value along with a list of 
> neighbors to the targets of its out-edges
> 3). Weigh the edges with the Adamic-Adar index: Sum over n from CN of 
> log(1/k_n)(CN is the set of all common neighbors of two vertices x, y. k_n is 
> the degree of node n). See [2]
> Using BloomFilters we increase the scalability of the algorithm. The values 
> calculated for the edges will be approximate.
> Prerequisites:
> Full understanding of the Jaccard Similarity Measure algorithm
> Reading the associated literature:
> [1] http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf
> [2] 
> http://stackoverflow.com/questions/22565620/fast-algorithm-to-compute-adamic-adar



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2634) Add a Vertex-centric Version of the Tringle Count Library Method

2016-05-11 Thread Vasia Kalavri (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vasia Kalavri closed FLINK-2634.

Resolution: Won't Fix

There is currently no need or demand for this version. The current 
TriangleEnumerator cover this functionality.

> Add a Vertex-centric Version of the Tringle Count Library Method
> 
>
> Key: FLINK-2634
> URL: https://issues.apache.org/jira/browse/FLINK-2634
> Project: Flink
>  Issue Type: Task
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Andra Lungu
>Priority: Minor
>
> The vertex-centric version of this algorithm receives an undirected graph as 
> input and outputs the total number of triangles formed by the graph's edges.
> The implementation consists of three phases:
> 1). Select neighbours with id greater than the current vertex id.
> 2). Propagate each received value to neighbours with higher id. 
> 3). Compute the number of Triangles by verifying if the final vertex contains 
> the sender's id in its list.
> As opposed to the GAS version, all these three steps will be performed via 
> message passing. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3898) Adamic-Adar Similarity

2016-05-11 Thread Vasia Kalavri (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280260#comment-15280260
 ] 

Vasia Kalavri commented on FLINK-3898:
--

I guess we can close FLINK-2310 as a duplicate? I know that nobody is working 
on it.

> Adamic-Adar Similarity
> --
>
> Key: FLINK-3898
> URL: https://issues.apache.org/jira/browse/FLINK-3898
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> The implementation of Adamic-Adar Similarity [0] is very close to Jaccard 
> Similarity. Whereas Jaccard Similarity counts common neighbors, Adamic-Adar 
> Similarity sums the inverse logarithm of the degree of common neighbors.
> Consideration will be given to the computation of the inverse logarithm, in 
> particular whether to pre-compute a small array of values.
> [0] http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

2016-05-11 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1956#discussion_r62863450
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java
 ---
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.NullValueEdgeMapper;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * This is an implementation of HITS algorithm, using a scatter-gather 
iteration.
+ * The user can define the maximum number of iterations. HITS algorithm is 
determined by two parameters,
+ * hubs and authorities. A good hub represents a page that points to many 
other pages, and a good authority
+ * represented a page that is linked by many different hubs.
+ * Each vertex has a value of Tuple2 type, the first field is hub score 
and the second field is authority score.
+ * The implementation assumes that the two score are the same in each 
vertex at the beginning.
+ * 
+ *
+ * @see https://en.wikipedia.org/wiki/HITS_algorithm;>HITS 
Algorithm
+ */
+public class HITSAlgorithm implements GraphAlgorithm 0, "The number of 
maximum iteration should be greater than 0.");
+   this.maxIterations = maxIterations * 2 + 1;
+   }
+
+   @Override
+   public DataSet>> run(Graph 
netGraph) throws Exception {
+
+   ScatterGatherConfiguration parameter = new 
ScatterGatherConfiguration();
+   parameter.setDirection(EdgeDirection.ALL);
+   parameter.registerAggregator("sumVertexValue", new 
DoubleSumAggregator());
+
+   return netGraph
+   .mapVertices(new VertexInitMapper())
+   .mapEdges(new NullValueEdgeMapper())
--- End diff --

Mapping to `NullValue` is not necessary if the edge values are already 
`NullValue`. Based on the code in `Translate`, we can `TypeInformation 
typeInfo = ((TupleTypeInfo>) 
netGraph.getEdges().getType()).getTypeAt(2);` and then `if 
(typeInfo.getTypeClass().equals(NullValue.class)) ...`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3879) Native implementation of HITS algorithm

2016-05-11 Thread Vasia Kalavri (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280246#comment-15280246
 ] 

Vasia Kalavri commented on FLINK-3879:
--

[~greghogan]
- Do we agree that the PR for FLINK-2044 is now in good state and could be 
merged? Or would you rather benchmark this against it and go for the most 
performant one?
- Gelly library methods: currently there are scatter-gather and GSA 
implementations for PageRank, Connected Components, and SSSP. We have these 
because GSA performs better for graphs with skewed degree distributions. In the 
Gelly docs-[iteration abstractions 
comparison|https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/gelly.html#iteration-abstractions-comparison],
 we describe when GSA should be preferred over scatter-gather. Maybe we can 
make this more explicit.
There is no Pregel implementation (only in examples). The {{GSATriangleCount}} 
library method has proved to be very inefficient and should be removed imo 
(I'll open a JIRA).
- I'm not sure what you mean by "approximate HITS"?

> Native implementation of HITS algorithm
> ---
>
> Key: FLINK-3879
> URL: https://issues.apache.org/jira/browse/FLINK-3879
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Hyperlink-Induced Topic Search (HITS, also "hubs and authorities") is 
> presented in [0] and described in [1].
> "[HITS] is a very popular and effective algorithm to rank documents based on 
> the link information among a set of documents. The algorithm presumes that a 
> good hub is a document that points to many others, and a good authority is a 
> document that many documents point to." 
> [https://pdfs.semanticscholar.org/a8d7/c7a4c53a9102c4239356f9072ec62ca5e62f.pdf]
> This implementation differs from FLINK-2044 by providing for convergence, 
> outputting both hub and authority scores, and completing in half the number 
> of iterations.
> [0] http://www.cs.cornell.edu/home/kleinber/auth.pdf
> [1] https://en.wikipedia.org/wiki/HITS_algorithm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

2016-05-11 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1956#discussion_r62863557
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java
 ---
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.NullValueEdgeMapper;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * This is an implementation of HITS algorithm, using a scatter-gather 
iteration.
+ * The user can define the maximum number of iterations. HITS algorithm is 
determined by two parameters,
+ * hubs and authorities. A good hub represents a page that points to many 
other pages, and a good authority
+ * represented a page that is linked by many different hubs.
+ * Each vertex has a value of Tuple2 type, the first field is hub score 
and the second field is authority score.
+ * The implementation assumes that the two score are the same in each 
vertex at the beginning.
+ * 
+ *
+ * @see https://en.wikipedia.org/wiki/HITS_algorithm;>HITS 
Algorithm
+ */
+public class HITSAlgorithm implements GraphAlgorithm 0, "The number of 
maximum iteration should be greater than 0.");
+   this.maxIterations = maxIterations * 2 + 1;
+   }
+
+   @Override
+   public DataSet>> run(Graph 
netGraph) throws Exception {
--- End diff --

Change `Double` to `DoubleValue`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3701] reuse serializer lists in Executi...

2016-05-11 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1913#issuecomment-218483592
  
Thanks for for your remarks. Instead of serializing/deserializing parts of 
the `ExecutionConfig` whenever the usercode class loader needs to be passed, I 
would like to serialize/deserialize the entire `ExecutionConfig`. This makes 
everything explicit wherever the config is used. Plus, we create an explicit 
copy of the config upon submission, regardless of the Akka object reuse mode. 
The PojoSerializer will just keep a regular copy of the ExecutionConfig because 
we always have the user code class loader available then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2971][table] Add outer joins to the Tab...

2016-05-11 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1981#issuecomment-218481504
  
Thanks for this PR, @dawidwys! I haven't had a look yet but will do soon. 

Just a quick comment: this PR touches some files that PR #1958 is also 
modifying (mostly on the API level, not the runtime and optimization code). PR 
#1958 is a bigger change but in good shape and should be mergable soon. I would 
like to merge #1958 before this one, so you will need to rebase once this has 
happened. That should not be too much work, just wanted to let you know in 
advance.

Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280204#comment-15280204
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-218475561
  
@yjshen great work! PR looks very good. I had only some minor refactoring 
comments.


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread twalthr
Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-218475561
  
@yjshen great work! PR looks very good. I had only some minor refactoring 
comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280218#comment-15280218
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-218477620
  
Hi @twalthr, thanks very much for the review work! I'll resolve your 
comments shortly :)


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread yjshen
Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-218477620
  
Hi @twalthr, thanks very much for the review work! I'll resolve your 
comments shortly :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3776] Flink Scala shell does not allow ...

2016-05-11 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1945#discussion_r62857420
  
--- Diff: 
flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala ---
@@ -141,7 +141,7 @@ object FlinkShell {
   ): (String, Int, Option[Either[FlinkMiniCluster, 
AbstractFlinkYarnCluster]]) = {
 config.executionMode match {
   case ExecutionMode.LOCAL => // Local mode
-val config = new Configuration()
+val config = GlobalConfiguration.getConfiguration()
--- End diff --

I think the conflagration needs to be loaded similarly as in line 183.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3776) Flink Scala shell does not allow to set configuration for local execution

2016-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280203#comment-15280203
 ] 

ASF GitHub Bot commented on FLINK-3776:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1945#discussion_r62857420
  
--- Diff: 
flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala ---
@@ -141,7 +141,7 @@ object FlinkShell {
   ): (String, Int, Option[Either[FlinkMiniCluster, 
AbstractFlinkYarnCluster]]) = {
 config.executionMode match {
   case ExecutionMode.LOCAL => // Local mode
-val config = new Configuration()
+val config = GlobalConfiguration.getConfiguration()
--- End diff --

I think the conflagration needs to be loaded similarly as in line 183.


> Flink Scala shell does not allow to set configuration for local execution
> -
>
> Key: FLINK-3776
> URL: https://issues.apache.org/jira/browse/FLINK-3776
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala Shell
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Priority: Minor
>
> Flink's Scala shell starts a {{LocalFlinkMiniCluster}} with an empty 
> configuration when the shell is started in local mode. In order to allow the 
> user to configure the mini cluster, e.g., number of slots, size of memory, it 
> would be good to forward a user specified configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3776) Flink Scala shell does not allow to set configuration for local execution

2016-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280209#comment-15280209
 ] 

ASF GitHub Bot commented on FLINK-3776:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1945#issuecomment-218476654
  
Thanks for the pull request. Looks good. Have you tested that the 
configuration is loaded correctly?


> Flink Scala shell does not allow to set configuration for local execution
> -
>
> Key: FLINK-3776
> URL: https://issues.apache.org/jira/browse/FLINK-3776
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala Shell
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Priority: Minor
>
> Flink's Scala shell starts a {{LocalFlinkMiniCluster}} with an empty 
> configuration when the shell is started in local mode. In order to allow the 
> user to configure the mini cluster, e.g., number of slots, size of memory, it 
> would be good to forward a user specified configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3776] Flink Scala shell does not allow ...

2016-05-11 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1945#issuecomment-218476654
  
Thanks for the pull request. Looks good. Have you tested that the 
configuration is loaded correctly?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3733) registeredTypesWithKryoSerializers is not assigned in ExecutionConfig#deserializeUserCode()

2016-05-11 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280193#comment-15280193
 ] 

Maximilian Michels commented on FLINK-3733:
---

I think this might be a duplicate of FLINK-3701.

> registeredTypesWithKryoSerializers is not assigned in 
> ExecutionConfig#deserializeUserCode()
> ---
>
> Key: FLINK-3733
> URL: https://issues.apache.org/jira/browse/FLINK-3733
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> if (serializedRegisteredTypesWithKryoSerializers != null) {
>   registeredTypesWithKryoSerializers = 
> serializedRegisteredTypesWithKryoSerializers.deserializeValue(userCodeClassLoader);
> } else {
>   registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>();
> }
> {code}
> When serializedRegisteredTypesWithKryoSerializers is null, 
> registeredTypesWithKryoSerializers is not assigned.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280187#comment-15280187
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62856490
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/UnresolvedException.scala
 ---
@@ -15,9 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.api.table
+package org.apache.flink.api.table.expressions
 
-/**
- * General Exception for all errors during table handling.
- */
-class TableException(msg: String) extends RuntimeException(msg)
+case class UnresolvedException(msg: String) extends RuntimeException(msg)
--- End diff --

OK, will do this


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62856490
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/UnresolvedException.scala
 ---
@@ -15,9 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.api.table
+package org.apache.flink.api.table.expressions
 
-/**
- * General Exception for all errors during table handling.
- */
-class TableException(msg: String) extends RuntimeException(msg)
+case class UnresolvedException(msg: String) extends RuntimeException(msg)
--- End diff --

OK, will do this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280178#comment-15280178
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62855247
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
 ---
@@ -18,85 +18,26 @@
 package org.apache.flink.api.table.expressions
 
 import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.SqlOperator
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.table.validate.{ExprValidationResult, 
ValidationFailure}
 
 /**
   * General expression for unresolved function calls. The function can be 
a built-in
   * scalar function or a user-defined scalar function.
   */
-case class Call(functionName: String, args: Expression*) extends 
Expression {
+case class Call(functionName: String, args: Seq[Expression]) extends 
Expression {
--- End diff --

While I was resolving `Call` into solid expressions, `FunctionCatalog` is 
used and its `withChildren`  method is looking up constructors to create new 
`expression` as follows: 

1. look up expressions take `Seq[Expression]` as argument
2. if 1 is not fulfilled, look up the constructor match exactly the number 
of arguments, like the `MyFunc` example you provided.

Does this make sense?


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62855247
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
 ---
@@ -18,85 +18,26 @@
 package org.apache.flink.api.table.expressions
 
 import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.SqlOperator
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.table.validate.{ExprValidationResult, 
ValidationFailure}
 
 /**
   * General expression for unresolved function calls. The function can be 
a built-in
   * scalar function or a user-defined scalar function.
   */
-case class Call(functionName: String, args: Expression*) extends 
Expression {
+case class Call(functionName: String, args: Seq[Expression]) extends 
Expression {
--- End diff --

While I was resolving `Call` into solid expressions, `FunctionCatalog` is 
used and its `withChildren`  method is looking up constructors to create new 
`expression` as follows: 

1. look up expressions take `Seq[Expression]` as argument
2. if 1 is not fulfilled, look up the constructor match exactly the number 
of arguments, like the `MyFunc` example you provided.

Does this make sense?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62853599
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
 ---
@@ -130,19 +130,17 @@ trait ImplicitExpressionOperations {
 * @param endIndex last character of the substring (starting at 1, 
inclusive)
 * @return substring
 */
-  def substring(beginIndex: Expression, endIndex: Expression) = {
-Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex, endIndex)
-  }
+  def substring(beginIndex: Expression, endIndex: Expression) =
+SubString(expr, beginIndex, endIndex)
 
   /**
 * Creates a substring of the given string beginning at the given index 
to the end.
 *
 * @param beginIndex first character of the substring (starting at 1, 
inclusive)
 * @return substring
 */
-  def substring(beginIndex: Expression) = {
-Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex)
-  }
+  def substring(beginIndex: Expression) =
+new SubString(expr, beginIndex)
--- End diff --

Since `SubString` is defined as `case class SubString(str: Expression, 
begin: Expression, end: Expression)` and so does the generated `apply` method, 
the `new` here cannot be removed, otherwise complaining of `cannot resolve 
subString with such signature` 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fpompermaier
Github user fpompermaier commented on a diff in the pull request:

https://github.com/apache/flink/pull/1941#discussion_r62854701
  
--- Diff: 
flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/example/JDBCFullTest.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.example;
+
+import java.sql.Types;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
+import 
org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder;
+import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
+import org.apache.flink.api.java.io.jdbc.JDBCTestBase;
+import 
org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.junit.Test;
+
+public class JDBCFullTest extends JDBCTestBase {
+   
+   @Test
+   public void test() throws Exception {
--- End diff --

Before of my PR this class was not even a test, the code was inside a 
main(), I thought it was at least better than that but I didn't want to spend 
more time on this..is it a problem if we leave it as it is?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62851933
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
 ---
@@ -18,85 +18,26 @@
 package org.apache.flink.api.table.expressions
 
 import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.SqlOperator
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.table.validate.{ExprValidationResult, 
ValidationFailure}
 
 /**
   * General expression for unresolved function calls. The function can be 
a built-in
   * scalar function or a user-defined scalar function.
   */
-case class Call(functionName: String, args: Expression*) extends 
Expression {
+case class Call(functionName: String, args: Seq[Expression]) extends 
Expression {
--- End diff --

The reason why I used var-args here was that it is easier to be used in the 
API once custom functions are possible: `.filter(Call("MYFUNC", 'f1, 'f2))`. 
But we can also find an other way later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280165#comment-15280165
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62853599
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
 ---
@@ -130,19 +130,17 @@ trait ImplicitExpressionOperations {
 * @param endIndex last character of the substring (starting at 1, 
inclusive)
 * @return substring
 */
-  def substring(beginIndex: Expression, endIndex: Expression) = {
-Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex, endIndex)
-  }
+  def substring(beginIndex: Expression, endIndex: Expression) =
+SubString(expr, beginIndex, endIndex)
 
   /**
 * Creates a substring of the given string beginning at the given index 
to the end.
 *
 * @param beginIndex first character of the substring (starting at 1, 
inclusive)
 * @return substring
 */
-  def substring(beginIndex: Expression) = {
-Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex)
-  }
+  def substring(beginIndex: Expression) =
+new SubString(expr, beginIndex)
--- End diff --

Since `SubString` is defined as `case class SubString(str: Expression, 
begin: Expression, end: Expression)` and so does the generated `apply` method, 
the `new` here cannot be removed, otherwise complaining of `cannot resolve 
subString with such signature` 


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280168#comment-15280168
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62853748
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/TypeCoercion.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+object TypeCoercion {
--- End diff --

You are right, will fix this.


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62853748
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/TypeCoercion.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+object TypeCoercion {
--- End diff --

You are right, will fix this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-3780) Jaccard Similarity

2016-05-11 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-3780:
--
Fix Version/s: 1.1.0

> Jaccard Similarity
> --
>
> Key: FLINK-3780
> URL: https://issues.apache.org/jira/browse/FLINK-3780
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Implement a Jaccard Similarity algorithm computing all non-zero similarity 
> scores. This algorithm is similar to {{TriangleListing}} but instead of 
> joining two-paths against an edge list we count two-paths.
> {{flink-gelly-examples}} currently has {{JaccardSimilarityMeasure}} which 
> relies on {{Graph.getTriplets()}} so only computes similarity scores for 
> neighbors but not neighbors-of-neighbors.
> This algorithm is easily modified for other similarity scores such as 
> Adamic-Adar similarity where the sum of endpoint degrees is replaced by the 
> degree of the middle vertex.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3898) Adamic-Adar Similarity

2016-05-11 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-3898:
-

 Summary: Adamic-Adar Similarity
 Key: FLINK-3898
 URL: https://issues.apache.org/jira/browse/FLINK-3898
 Project: Flink
  Issue Type: New Feature
  Components: Gelly
Affects Versions: 1.1.0
Reporter: Greg Hogan
Assignee: Greg Hogan


The implementation of Adamic-Adar Similarity [0] is very close to Jaccard 
Similarity. Whereas Jaccard Similarity counts common neighbors, Adamic-Adar 
Similarity sums the inverse logarithm of the degree of common neighbors.

Consideration will be given to the computation of the inverse logarithm, in 
particular whether to pre-compute a small array of values.

[0] http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fpompermaier
Github user fpompermaier commented on a diff in the pull request:

https://github.com/apache/flink/pull/1941#discussion_r62852662
  
--- Diff: 
flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
 ---
@@ -157,14 +216,25 @@ public boolean reachedEnd() throws IOException {
 * @throws java.io.IOException
 */
@Override
-   public OUT nextRecord(OUT tuple) throws IOException {
+   public Row nextRecord(Row row) throws IOException {
try {
-   resultSet.next();
-   if (columnTypes == null) {
-   extractTypes(tuple);
+   hasNext = resultSet.next();
+   if (!hasNext) {
+   return null;
+   }
+   try {
+   //This throws a NPE when the TypeInfo is not 
passed to the InputFormat,
+   //i.e. KryoSerializer used to generate the 
passed row
+   row.productArity();
--- End diff --

because it can happen that the row !=null but inside it is uninitialized, 
for example when the  (see for example 
**JDBCInputFormatTest.testUninitializedRow**.

That happens when you do the following (for example):
``` java
DataSet source = environment.createInput(inputBuilder.finish());
```

 The only method I found to detect such a situation is to catch the 
nullPointerException generated when trying to access to one of its methods, 
e.g. 
```java
row.productArity()
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280151#comment-15280151
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62852367
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
 ---
@@ -20,19 +20,55 @@ package org.apache.flink.api.table.expressions
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
 
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, 
TypeInformation}
 import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate._
 
-case class Cast(child: Expression, tpe: TypeInformation[_]) extends 
UnaryExpression {
+case class Cast(child: Expression, resultType: TypeInformation[_]) extends 
UnaryExpression {
 
-  override def toString = s"$child.cast($tpe)"
+  override def toString = s"$child.cast($resultType)"
 
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-relBuilder.cast(child.toRexNode, TypeConverter.typeInfoToSqlType(tpe))
+relBuilder.cast(child.toRexNode, 
TypeConverter.typeInfoToSqlType(resultType))
   }
 
-  override def makeCopy(anyRefs: Seq[AnyRef]): this.type = {
+  override def makeCopy(anyRefs: Array[AnyRef]): this.type = {
 val child: Expression = anyRefs.head.asInstanceOf[Expression]
-copy(child, tpe).asInstanceOf[this.type]
+copy(child, resultType).asInstanceOf[this.type]
+  }
+
+  override def validateInput(): ExprValidationResult = {
+if (Cast.canCast(child.resultType, resultType)) {
+  ValidationSuccess
+} else {
+  ValidationFailure(s"Unsupported cast from ${child.resultType} to 
$resultType")
+}
+  }
+}
+
+object Cast {
--- End diff --

I would also move this into `typeutils`. We'll may need it somewhere else 
too.


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2044) Implementation of Gelly HITS Algorithm

2016-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280150#comment-15280150
 ] 

ASF GitHub Bot commented on FLINK-2044:
---

Github user gallenvara commented on the pull request:

https://github.com/apache/flink/pull/1956#issuecomment-218467237
  
@vasia Thanks a lot and PR has been updated as you advised.


> Implementation of Gelly HITS Algorithm
> --
>
> Key: FLINK-2044
> URL: https://issues.apache.org/jira/browse/FLINK-2044
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Ahamd Javid
>Assignee: GaoLun
>Priority: Minor
>
> Implementation of Hits Algorithm in Gelly API using Java. the feature branch 
> can be found here: (https://github.com/JavidMayar/flink/commits/HITS)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62852367
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
 ---
@@ -20,19 +20,55 @@ package org.apache.flink.api.table.expressions
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
 
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, 
TypeInformation}
 import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate._
 
-case class Cast(child: Expression, tpe: TypeInformation[_]) extends 
UnaryExpression {
+case class Cast(child: Expression, resultType: TypeInformation[_]) extends 
UnaryExpression {
 
-  override def toString = s"$child.cast($tpe)"
+  override def toString = s"$child.cast($resultType)"
 
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-relBuilder.cast(child.toRexNode, TypeConverter.typeInfoToSqlType(tpe))
+relBuilder.cast(child.toRexNode, 
TypeConverter.typeInfoToSqlType(resultType))
   }
 
-  override def makeCopy(anyRefs: Seq[AnyRef]): this.type = {
+  override def makeCopy(anyRefs: Array[AnyRef]): this.type = {
 val child: Expression = anyRefs.head.asInstanceOf[Expression]
-copy(child, tpe).asInstanceOf[this.type]
+copy(child, resultType).asInstanceOf[this.type]
+  }
+
+  override def validateInput(): ExprValidationResult = {
+if (Cast.canCast(child.resultType, resultType)) {
+  ValidationSuccess
+} else {
+  ValidationFailure(s"Unsupported cast from ${child.resultType} to 
$resultType")
+}
+  }
+}
+
+object Cast {
--- End diff --

I would also move this into `typeutils`. We'll may need it somewhere else 
too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

2016-05-11 Thread gallenvara
Github user gallenvara commented on the pull request:

https://github.com/apache/flink/pull/1956#issuecomment-218467237
  
@vasia Thanks a lot and PR has been updated as you advised.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280145#comment-15280145
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62851933
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
 ---
@@ -18,85 +18,26 @@
 package org.apache.flink.api.table.expressions
 
 import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.SqlOperator
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.table.validate.{ExprValidationResult, 
ValidationFailure}
 
 /**
   * General expression for unresolved function calls. The function can be 
a built-in
   * scalar function or a user-defined scalar function.
   */
-case class Call(functionName: String, args: Expression*) extends 
Expression {
+case class Call(functionName: String, args: Seq[Expression]) extends 
Expression {
--- End diff --

The reason why I used var-args here was that it is easier to be used in the 
API once custom functions are possible: `.filter(Call("MYFUNC", 'f1, 'f2))`. 
But we can also find an other way later.


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280140#comment-15280140
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62851129
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
 ---
@@ -18,85 +18,26 @@
 package org.apache.flink.api.table.expressions
 
 import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.SqlOperator
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.table.validate.{ExprValidationResult, 
ValidationFailure}
 
 /**
   * General expression for unresolved function calls. The function can be 
a built-in
   * scalar function or a user-defined scalar function.
   */
-case class Call(functionName: String, args: Expression*) extends 
Expression {
+case class Call(functionName: String, args: Seq[Expression]) extends 
Expression {
 
   override def children: Seq[Expression] = args
 
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-relBuilder.call(
-  BuiltInFunctionNames.toSqlOperator(functionName),
-  args.map(_.toRexNode): _*)
+throw new UnresolvedException(s"trying to convert UnresolvedFunction 
$functionName to RexNode")
   }
 
   override def toString = s"\\$functionName(${args.mkString(", ")})"
 
-  override def makeCopy(newArgs: Seq[AnyRef]): this.type = {
-val copy = Call(
-  newArgs.head.asInstanceOf[String],
-  newArgs.drop(1).asInstanceOf[Seq[Expression]]: _*)
+  override def resultType =
+throw new UnresolvedException(s"calling dataType on Unresolved 
Function $functionName")
--- End diff --

resultType instead of dataType. Space between Unresolved Function.


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62851129
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
 ---
@@ -18,85 +18,26 @@
 package org.apache.flink.api.table.expressions
 
 import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.SqlOperator
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.table.validate.{ExprValidationResult, 
ValidationFailure}
 
 /**
   * General expression for unresolved function calls. The function can be 
a built-in
   * scalar function or a user-defined scalar function.
   */
-case class Call(functionName: String, args: Expression*) extends 
Expression {
+case class Call(functionName: String, args: Seq[Expression]) extends 
Expression {
 
   override def children: Seq[Expression] = args
 
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-relBuilder.call(
-  BuiltInFunctionNames.toSqlOperator(functionName),
-  args.map(_.toRexNode): _*)
+throw new UnresolvedException(s"trying to convert UnresolvedFunction 
$functionName to RexNode")
   }
 
   override def toString = s"\\$functionName(${args.mkString(", ")})"
 
-  override def makeCopy(newArgs: Seq[AnyRef]): this.type = {
-val copy = Call(
-  newArgs.head.asInstanceOf[String],
-  newArgs.drop(1).asInstanceOf[Seq[Expression]]: _*)
+  override def resultType =
+throw new UnresolvedException(s"calling dataType on Unresolved 
Function $functionName")
--- End diff --

resultType instead of dataType. Space between Unresolved Function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3879) Native implementation of HITS algorithm

2016-05-11 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280134#comment-15280134
 ] 

Greg Hogan commented on FLINK-3879:
---

Implementations which merely showcase the use of Gelly graph models seem most 
appropriate as examples. Do we have examples of inputs which perform better as 
GSA vs SG vs Pregel? I am not finding any direct guidance in the documentation 
for a user looking to choose between duplicate library algorithms.

FLINK-3879 will be faster unless one of the current models is extended or a new 
graph model is created to process out- and in- edges separately in the same 
iteration or to allow disabling operators on certain supersteps.

An approximate HITS using delta iterations would be as easy to implement 
natively as with GSA. Before accepting such an implementation I would like to 
see evidence that performing more approximate iterations converges more quickly 
when compared with running fewer bulk iterations.

> Native implementation of HITS algorithm
> ---
>
> Key: FLINK-3879
> URL: https://issues.apache.org/jira/browse/FLINK-3879
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Hyperlink-Induced Topic Search (HITS, also "hubs and authorities") is 
> presented in [0] and described in [1].
> "[HITS] is a very popular and effective algorithm to rank documents based on 
> the link information among a set of documents. The algorithm presumes that a 
> good hub is a document that points to many others, and a good authority is a 
> document that many documents point to." 
> [https://pdfs.semanticscholar.org/a8d7/c7a4c53a9102c4239356f9072ec62ca5e62f.pdf]
> This implementation differs from FLINK-2044 by providing for convergence, 
> outputting both hub and authority scores, and completing in half the number 
> of iterations.
> [0] http://www.cs.cornell.edu/home/kleinber/auth.pdf
> [1] https://en.wikipedia.org/wiki/HITS_algorithm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fpompermaier
Github user fpompermaier commented on a diff in the pull request:

https://github.com/apache/flink/pull/1941#discussion_r62850239
  
--- Diff: 
flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
 ---
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+/**
+ * Base test class for JDBC Input and Output formats
+ */
+public class JDBCTestBase {
+   
+   public static final String DRIVER_CLASS = 
"org.apache.derby.jdbc.EmbeddedDriver";
+   public static final String DB_URL = "jdbc:derby:memory:ebookshop";
+   public static final String INPUT_TABLE = "books";
+   public static final String OUTPUT_TABLE = "newbooks";
+   public static final String SELECT_ALL_BOOKS = "select * from " + 
INPUT_TABLE;
+   public static final String SELECT_ALL_NEWBOOKS = "select * from " + 
OUTPUT_TABLE;
+   public static final String SELECT_EMPTY = "select * from books WHERE 
QTY < 0";
+   public static final String INSERT_TEMPLATE = "insert into %s (id, 
title, author, price, qty) values (?,?,?,?,?)";
+   public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = 
JDBCTestBase.SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?";
+   public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = 
JDBCTestBase.SELECT_ALL_BOOKS + " WHERE author = ?";
+   
+   protected JDBCInputFormat jdbcInputFormat;
+   protected JDBCOutputFormat jdbcOutputFormat;
+
+   protected static Connection conn;
+
+   public static final Object[][] testData = {
--- End diff --

The record with id 1010 contains already a null field..isn't that 
sufficient?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280129#comment-15280129
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62849325
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/UnresolvedException.scala
 ---
@@ -15,9 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.api.table
+package org.apache.flink.api.table.expressions
 
-/**
- * General Exception for all errors during table handling.
- */
-class TableException(msg: String) extends RuntimeException(msg)
+case class UnresolvedException(msg: String) extends RuntimeException(msg)
--- End diff --

Could you put this into `exceptions.scala` and document it?


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62849325
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/UnresolvedException.scala
 ---
@@ -15,9 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.api.table
+package org.apache.flink.api.table.expressions
 
-/**
- * General Exception for all errors during table handling.
- */
-class TableException(msg: String) extends RuntimeException(msg)
+case class UnresolvedException(msg: String) extends RuntimeException(msg)
--- End diff --

Could you put this into `exceptions.scala` and document it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62848946
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/TypeCoercion.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+object TypeCoercion {
--- End diff --

I would put this into the `typeutils` package. The `expressions` package 
should only contain expressions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280120#comment-15280120
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62848946
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/TypeCoercion.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+object TypeCoercion {
--- End diff --

I would put this into the `typeutils` package. The `expressions` package 
should only contain expressions.


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fpompermaier
Github user fpompermaier commented on a diff in the pull request:

https://github.com/apache/flink/pull/1941#discussion_r62847383
  
--- Diff: 
flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
 ---
@@ -81,25 +134,51 @@ public void configure(Configuration parameters) {
 * @throws IOException
 */
@Override
-   public void open(InputSplit ignored) throws IOException {
+   public void open(InputSplit inputSplit) throws IOException {
+   hasNext = true;
try {
-   establishConnection();
-   statement = dbConn.createStatement(resultSetType, 
resultSetConcurrency);
-   resultSet = statement.executeQuery(query);
+   if (inputSplit != null && parameterValues != null) {
+   for (int i = 0; i < 
parameterValues[inputSplit.getSplitNumber()].length; i++) {
+   Object param = 
parameterValues[inputSplit.getSplitNumber()][i];
+   if (param instanceof String) {
+   statement.setString(i + 1, 
(String) param);
+   } else if (param instanceof Long) {
+   statement.setLong(i + 1, (Long) 
param);
+   } else if (param instanceof Integer) {
+   statement.setInt(i + 1, 
(Integer) param);
+   } else if (param instanceof Double) {
+   statement.setDouble(i + 1, 
(Double) param);
+   } else if (param instanceof Boolean) {
+   statement.setBoolean(i + 1, 
(Boolean) param);
+   } else if (param instanceof Float) {
+   statement.setFloat(i + 1, 
(Float) param);
+   } else if (param instanceof BigDecimal) 
{
+   statement.setBigDecimal(i + 1, 
(BigDecimal) param);
+   } else if (param instanceof Byte) {
+   statement.setByte(i + 1, (Byte) 
param);
+   } else if (param instanceof Short) {
+   statement.setShort(i + 1, 
(Short) param);
+   } else if (param instanceof Date) {
+   statement.setDate(i + 1, (Date) 
param);
+   } else if (param instanceof Time) {
+   statement.setTime(i + 1, (Time) 
param);
+   } else if (param instanceof Timestamp) {
+   statement.setTimestamp(i + 1, 
(Timestamp) param);
+   } else if (param instanceof Array) {
+   statement.setArray(i + 1, 
(Array) param);
+   } else {
+   //extends with other types if 
needed
+   throw new 
IllegalArgumentException("open() failed. Parameter " + i + " of type " + 
param.getClass() + " is not handled (yet)." );
+   }
+   }
+   if (LOG.isDebugEnabled()) {
+   LOG.debug(String.format("Executing '%s' 
with parameters %s", queryTemplate, 
Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()])));
+   }
+   }
+   resultSet = statement.executeQuery();
} catch (SQLException se) {
close();
--- End diff --

I did that in the previous version of this PR 
(https://github.com/apache/flink/pull/1885) but @zentol told me to leave it 
("this is not guaranteed, so please add them back"). From my check @zentol was 
right..isn't it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280074#comment-15280074
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62843680
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
 ---
@@ -130,19 +130,17 @@ trait ImplicitExpressionOperations {
 * @param endIndex last character of the substring (starting at 1, 
inclusive)
 * @return substring
 */
-  def substring(beginIndex: Expression, endIndex: Expression) = {
-Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex, endIndex)
-  }
+  def substring(beginIndex: Expression, endIndex: Expression) =
+SubString(expr, beginIndex, endIndex)
 
   /**
 * Creates a substring of the given string beginning at the given index 
to the end.
 *
 * @param beginIndex first character of the substring (starting at 1, 
inclusive)
 * @return substring
 */
-  def substring(beginIndex: Expression) = {
-Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex)
-  }
+  def substring(beginIndex: Expression) =
+new SubString(expr, beginIndex)
--- End diff --

`new` can be removed.


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62843680
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
 ---
@@ -130,19 +130,17 @@ trait ImplicitExpressionOperations {
 * @param endIndex last character of the substring (starting at 1, 
inclusive)
 * @return substring
 */
-  def substring(beginIndex: Expression, endIndex: Expression) = {
-Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex, endIndex)
-  }
+  def substring(beginIndex: Expression, endIndex: Expression) =
+SubString(expr, beginIndex, endIndex)
 
   /**
 * Creates a substring of the given string beginning at the given index 
to the end.
 *
 * @param beginIndex first character of the substring (starting at 1, 
inclusive)
 * @return substring
 */
-  def substring(beginIndex: Expression) = {
-Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex)
-  }
+  def substring(beginIndex: Expression) =
+new SubString(expr, beginIndex)
--- End diff --

`new` can be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280049#comment-15280049
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-218447948
  
Hi @fhueske , I've just finished my work, can you give a another pass of 
review? Thanks!


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread yjshen
Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-218447948
  
Hi @fhueske , I've just finished my work, can you give a another pass of 
review? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

2016-05-11 Thread vasia
Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1956#issuecomment-218446355
  
Thanks for the updates @gallenvara. I left a few minor comments. Could you 
please also add the algorithm in the Gelly documentation under "library 
methods"? It should be good to merge after that :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2044) Implementation of Gelly HITS Algorithm

2016-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280047#comment-15280047
 ] 

ASF GitHub Bot commented on FLINK-2044:
---

Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1956#issuecomment-218446355
  
Thanks for the updates @gallenvara. I left a few minor comments. Could you 
please also add the algorithm in the Gelly documentation under "library 
methods"? It should be good to merge after that :)


> Implementation of Gelly HITS Algorithm
> --
>
> Key: FLINK-2044
> URL: https://issues.apache.org/jira/browse/FLINK-2044
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Ahamd Javid
>Assignee: GaoLun
>Priority: Minor
>
> Implementation of Hits Algorithm in Gelly API using Java. the feature branch 
> can be found here: (https://github.com/JavidMayar/flink/commits/HITS)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2044) Implementation of Gelly HITS Algorithm

2016-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280042#comment-15280042
 ] 

ASF GitHub Bot commented on FLINK-2044:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1956#discussion_r62838798
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.NullValueEdgeMapper;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * This is an implementation of HITS algorithm, using a scatter-gather 
iteration.
+ * The user can define the maximum number of iterations. HITS algorithm is 
determined by two parameters,
+ * hubs and authorities. A good hub represented a page that pointed to 
many other pages, and a good authority
+ * represented a page that was linked by many different hubs. The 
implementation assumes that the two value on
+ * every vertex are the same at the beginning.
+ * 
+ *
+ * @see https://en.wikipedia.org/wiki/HITS_algorithm;>HITS 
Algorithm
+ */
--- End diff --

Can you also please add a comment about the result type? Which tuple field 
is the authority score and which is the hub?


> Implementation of Gelly HITS Algorithm
> --
>
> Key: FLINK-2044
> URL: https://issues.apache.org/jira/browse/FLINK-2044
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Ahamd Javid
>Assignee: GaoLun
>Priority: Minor
>
> Implementation of Hits Algorithm in Gelly API using Java. the feature branch 
> can be found here: (https://github.com/JavidMayar/flink/commits/HITS)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2044) Implementation of Gelly HITS Algorithm

2016-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280040#comment-15280040
 ] 

ASF GitHub Bot commented on FLINK-2044:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1956#discussion_r62838726
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.NullValueEdgeMapper;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * This is an implementation of HITS algorithm, using a scatter-gather 
iteration.
+ * The user can define the maximum number of iterations. HITS algorithm is 
determined by two parameters,
+ * hubs and authorities. A good hub represented a page that pointed to 
many other pages, and a good authority
+ * represented a page that was linked by many different hubs. The 
implementation assumes that the two value on
--- End diff --

represented => represents
pointed => points
was linked => is linked
the two value => the two values*


> Implementation of Gelly HITS Algorithm
> --
>
> Key: FLINK-2044
> URL: https://issues.apache.org/jira/browse/FLINK-2044
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Ahamd Javid
>Assignee: GaoLun
>Priority: Minor
>
> Implementation of Hits Algorithm in Gelly API using Java. the feature branch 
> can be found here: (https://github.com/JavidMayar/flink/commits/HITS)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

2016-05-11 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1956#discussion_r62838798
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.NullValueEdgeMapper;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * This is an implementation of HITS algorithm, using a scatter-gather 
iteration.
+ * The user can define the maximum number of iterations. HITS algorithm is 
determined by two parameters,
+ * hubs and authorities. A good hub represented a page that pointed to 
many other pages, and a good authority
+ * represented a page that was linked by many different hubs. The 
implementation assumes that the two value on
+ * every vertex are the same at the beginning.
+ * 
+ *
+ * @see https://en.wikipedia.org/wiki/HITS_algorithm;>HITS 
Algorithm
+ */
--- End diff --

Can you also please add a comment about the result type? Which tuple field 
is the authority score and which is the hub?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

2016-05-11 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1956#discussion_r62838726
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.NullValueEdgeMapper;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * This is an implementation of HITS algorithm, using a scatter-gather 
iteration.
+ * The user can define the maximum number of iterations. HITS algorithm is 
determined by two parameters,
+ * hubs and authorities. A good hub represented a page that pointed to 
many other pages, and a good authority
+ * represented a page that was linked by many different hubs. The 
implementation assumes that the two value on
--- End diff --

represented => represents
pointed => points
was linked => is linked
the two value => the two values*


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2044) Implementation of Gelly HITS Algorithm

2016-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280037#comment-15280037
 ] 

ASF GitHub Bot commented on FLINK-2044:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1956#discussion_r62837591
  
--- Diff: 
flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/HITSData.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples.data;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.NullValue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data set used for the HITS test program.
+ * If no parameters are given to the program, the default edge data set is 
used.
--- End diff --

I guess you copied this comment from another similar class. The "If no 
parameters given..." refers to Gelly examples, which run with default data if 
no parameters are provided. In this case HITS is implemented as a library 
method, so this comment can be removed. This data is only used for testing as 
far as I can tell :)


> Implementation of Gelly HITS Algorithm
> --
>
> Key: FLINK-2044
> URL: https://issues.apache.org/jira/browse/FLINK-2044
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Ahamd Javid
>Assignee: GaoLun
>Priority: Minor
>
> Implementation of Hits Algorithm in Gelly API using Java. the feature branch 
> can be found here: (https://github.com/JavidMayar/flink/commits/HITS)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

2016-05-11 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1956#discussion_r62837591
  
--- Diff: 
flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/HITSData.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples.data;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.NullValue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data set used for the HITS test program.
+ * If no parameters are given to the program, the default edge data set is 
used.
--- End diff --

I guess you copied this comment from another similar class. The "If no 
parameters given..." refers to Gelly examples, which run with default data if 
no parameters are provided. In this case HITS is implemented as a library 
method, so this comment can be removed. This data is only used for testing as 
far as I can tell :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-05-11 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280018#comment-15280018
 ] 

Dawid Wysakowicz commented on FLINK-3848:
-

Sure I will have a look into current state and wait for more info, no hurry.

> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1941#issuecomment-218438515
  
Thanks for the update @fpompermaier. Overall the PR looks good. I added a 
few comments and suggestions. Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1941#discussion_r62834635
  
--- Diff: 
flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/example/JDBCFullTest.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.example;
+
+import java.sql.Types;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
+import 
org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder;
+import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
+import org.apache.flink.api.java.io.jdbc.JDBCTestBase;
+import 
org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.junit.Test;
+
+public class JDBCFullTest extends JDBCTestBase {
+   
+   @Test
+   public void test() throws Exception {
--- End diff --

This test does not assert the correctness of the data. It just checks that 
the code runs but not whether it is correct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1941#discussion_r62834552
  
--- Diff: 
flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
 ---
@@ -19,135 +19,42 @@
 package org.apache.flink.api.java.io.jdbc;
 
 import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
 import java.sql.ResultSet;
 
-import org.junit.Assert;
-
 import org.apache.flink.api.java.tuple.Tuple5;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.apache.flink.api.table.Row;
+import org.junit.Assert;
 import org.junit.Test;
 
-public class JDBCOutputFormatTest {
-   private JDBCInputFormat jdbcInputFormat;
-   private JDBCOutputFormat jdbcOutputFormat;
-
-   private static Connection conn;
-
-   static final Object[][] dbData = {
-   {1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11},
-   {1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22},
-   {1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 
33},
-   {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44},
-   {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}};
-
-   @BeforeClass
-   public static void setUpClass() throws SQLException {
-   try {
-   System.setProperty("derby.stream.error.field", 
"org.apache.flink.api.java.io.jdbc.DerbyUtil.DEV_NULL");
-   prepareDerbyDatabase();
-   } catch (ClassNotFoundException e) {
-   e.printStackTrace();
-   Assert.fail();
-   }
-   }
-
-   private static void prepareDerbyDatabase() throws 
ClassNotFoundException, SQLException {
-   String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-   Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-   conn = DriverManager.getConnection(dbURL);
-   createTable("books");
-   createTable("newbooks");
-   insertDataToSQLTables();
-   conn.close();
-   }
-
-   private static void createTable(String tableName) throws SQLException {
-   StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE 
");
-   sqlQueryBuilder.append(tableName);
-   sqlQueryBuilder.append(" (");
-   sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-   sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-   sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-   sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-   sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-   sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-   Statement stat = conn.createStatement();
-   stat.executeUpdate(sqlQueryBuilder.toString());
-   stat.close();
-   }
-
-   private static void insertDataToSQLTables() throws SQLException {
-   StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO 
books (id, title, author, price, qty) VALUES ");
-   sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah 
Teck', 11.11, 11),");
-   sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah 
Teck', 22.22, 22),");
-   sqlQueryBuilder.append("(1003, 'More Java for more dummies', 
'Mohammad Ali', 33.33, 33),");
-   sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 
44),");
-   sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin 
Jones', 55.55, 55)");
-
-   Statement stat = conn.createStatement();
-   stat.execute(sqlQueryBuilder.toString());
-   stat.close();
-   }
-
-   @AfterClass
-   public static void tearDownClass() {
-   cleanUpDerbyDatabases();
-   }
-
-   private static void cleanUpDerbyDatabases() {
-   try {
-   String dbURL = 
"jdbc:derby:memory:ebookshop;create=true";
-   Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-
-   conn = DriverManager.getConnection(dbURL);
-   Statement stat = conn.createStatement();
-   stat.executeUpdate("DROP TABLE books");
-   stat.executeUpdate("DROP TABLE newbooks");
-   stat.close();
-   conn.close();
-   } catch (Exception e) {
-   e.printStackTrace();
-   Assert.fail();
-   }
-   }
-
-   @After
-   public 

[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1941#discussion_r62834121
  
--- Diff: 
flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
 ---
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+/**
+ * Base test class for JDBC Input and Output formats
+ */
+public class JDBCTestBase {
+   
+   public static final String DRIVER_CLASS = 
"org.apache.derby.jdbc.EmbeddedDriver";
+   public static final String DB_URL = "jdbc:derby:memory:ebookshop";
+   public static final String INPUT_TABLE = "books";
+   public static final String OUTPUT_TABLE = "newbooks";
+   public static final String SELECT_ALL_BOOKS = "select * from " + 
INPUT_TABLE;
+   public static final String SELECT_ALL_NEWBOOKS = "select * from " + 
OUTPUT_TABLE;
+   public static final String SELECT_EMPTY = "select * from books WHERE 
QTY < 0";
+   public static final String INSERT_TEMPLATE = "insert into %s (id, 
title, author, price, qty) values (?,?,?,?,?)";
+   public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = 
JDBCTestBase.SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?";
+   public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = 
JDBCTestBase.SELECT_ALL_BOOKS + " WHERE author = ?";
+   
+   protected JDBCInputFormat jdbcInputFormat;
+   protected JDBCOutputFormat jdbcOutputFormat;
+
+   protected static Connection conn;
+
+   public static final Object[][] testData = {
--- End diff --

Add one or two records with `null` values to the test data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1941#discussion_r62833701
  
--- Diff: 
flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
 ---
@@ -19,180 +19,224 @@
 package org.apache.flink.api.java.io.jdbc;
 
 import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
 import java.sql.ResultSet;
 
-
+import org.apache.flink.api.common.ExecutionConfig;
+import 
org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider;
+import 
org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
+import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.core.io.InputSplit;
 import org.junit.Assert;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-public class JDBCInputFormatTest {
-   JDBCInputFormat jdbcInputFormat;
-
-   static Connection conn;
-
-   static final Object[][] dbData = {
-   {1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11},
-   {1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22},
-   {1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 
33},
-   {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44},
-   {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}};
-
-   @BeforeClass
-   public static void setUpClass() {
-   try {
-   prepareDerbyDatabase();
-   } catch (Exception e) {
-   Assert.fail();
-   }
-   }
-
-   private static void prepareDerbyDatabase() throws 
ClassNotFoundException, SQLException {
-   System.setProperty("derby.stream.error.field", 
"org.apache.flink.api.java.io.jdbc.DerbyUtil.DEV_NULL");
-   String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-   Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-   conn = DriverManager.getConnection(dbURL);
-   createTable();
-   insertDataToSQLTable();
-   conn.close();
-   }
-
-   private static void createTable() throws SQLException {
-   StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE 
books (");
-   sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-   sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-   sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-   sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-   sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-   sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-   Statement stat = conn.createStatement();
-   stat.executeUpdate(sqlQueryBuilder.toString());
-   stat.close();
-   }
-
-   private static void insertDataToSQLTable() throws SQLException {
-   StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO 
books (id, title, author, price, qty) VALUES ");
-   sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah 
Teck', 11.11, 11),");
-   sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah 
Teck', 22.22, 22),");
-   sqlQueryBuilder.append("(1003, 'More Java for more dummies', 
'Mohammad Ali', 33.33, 33),");
-   sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 
44),");
-   sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin 
Jones', 55.55, 55)");
-
-   Statement stat = conn.createStatement();
-   stat.execute(sqlQueryBuilder.toString());
-   stat.close();
-   }
-
-   @AfterClass
-   public static void tearDownClass() {
-   cleanUpDerbyDatabases();
-   }
-
-   private static void cleanUpDerbyDatabases() {
-   try {
-   String dbURL = 
"jdbc:derby:memory:ebookshop;create=true";
-   Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-
-   conn = DriverManager.getConnection(dbURL);
-   Statement stat = conn.createStatement();
-   stat.executeUpdate("DROP TABLE books");
-   stat.close();
-   conn.close();
-   } catch (Exception e) {
-   e.printStackTrace();
-  

[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1941#discussion_r62833691
  
--- Diff: 
flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
 ---
@@ -19,180 +19,224 @@
 package org.apache.flink.api.java.io.jdbc;
 
 import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
 import java.sql.ResultSet;
 
-
+import org.apache.flink.api.common.ExecutionConfig;
+import 
org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider;
+import 
org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
+import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.core.io.InputSplit;
 import org.junit.Assert;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-public class JDBCInputFormatTest {
-   JDBCInputFormat jdbcInputFormat;
-
-   static Connection conn;
-
-   static final Object[][] dbData = {
-   {1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11},
-   {1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22},
-   {1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 
33},
-   {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44},
-   {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}};
-
-   @BeforeClass
-   public static void setUpClass() {
-   try {
-   prepareDerbyDatabase();
-   } catch (Exception e) {
-   Assert.fail();
-   }
-   }
-
-   private static void prepareDerbyDatabase() throws 
ClassNotFoundException, SQLException {
-   System.setProperty("derby.stream.error.field", 
"org.apache.flink.api.java.io.jdbc.DerbyUtil.DEV_NULL");
-   String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-   Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-   conn = DriverManager.getConnection(dbURL);
-   createTable();
-   insertDataToSQLTable();
-   conn.close();
-   }
-
-   private static void createTable() throws SQLException {
-   StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE 
books (");
-   sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-   sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-   sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-   sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-   sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-   sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-   Statement stat = conn.createStatement();
-   stat.executeUpdate(sqlQueryBuilder.toString());
-   stat.close();
-   }
-
-   private static void insertDataToSQLTable() throws SQLException {
-   StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO 
books (id, title, author, price, qty) VALUES ");
-   sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah 
Teck', 11.11, 11),");
-   sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah 
Teck', 22.22, 22),");
-   sqlQueryBuilder.append("(1003, 'More Java for more dummies', 
'Mohammad Ali', 33.33, 33),");
-   sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 
44),");
-   sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin 
Jones', 55.55, 55)");
-
-   Statement stat = conn.createStatement();
-   stat.execute(sqlQueryBuilder.toString());
-   stat.close();
-   }
-
-   @AfterClass
-   public static void tearDownClass() {
-   cleanUpDerbyDatabases();
-   }
-
-   private static void cleanUpDerbyDatabases() {
-   try {
-   String dbURL = 
"jdbc:derby:memory:ebookshop;create=true";
-   Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-
-   conn = DriverManager.getConnection(dbURL);
-   Statement stat = conn.createStatement();
-   stat.executeUpdate("DROP TABLE books");
-   stat.close();
-   conn.close();
-   } catch (Exception e) {
-   e.printStackTrace();
-  

[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1941#discussion_r62833637
  
--- Diff: 
flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
 ---
@@ -19,180 +19,224 @@
 package org.apache.flink.api.java.io.jdbc;
 
 import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
 import java.sql.ResultSet;
 
-
+import org.apache.flink.api.common.ExecutionConfig;
+import 
org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider;
+import 
org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
+import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.core.io.InputSplit;
 import org.junit.Assert;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-public class JDBCInputFormatTest {
-   JDBCInputFormat jdbcInputFormat;
-
-   static Connection conn;
-
-   static final Object[][] dbData = {
-   {1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11},
-   {1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22},
-   {1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 
33},
-   {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44},
-   {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}};
-
-   @BeforeClass
-   public static void setUpClass() {
-   try {
-   prepareDerbyDatabase();
-   } catch (Exception e) {
-   Assert.fail();
-   }
-   }
-
-   private static void prepareDerbyDatabase() throws 
ClassNotFoundException, SQLException {
-   System.setProperty("derby.stream.error.field", 
"org.apache.flink.api.java.io.jdbc.DerbyUtil.DEV_NULL");
-   String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-   Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-   conn = DriverManager.getConnection(dbURL);
-   createTable();
-   insertDataToSQLTable();
-   conn.close();
-   }
-
-   private static void createTable() throws SQLException {
-   StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE 
books (");
-   sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-   sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-   sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-   sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-   sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-   sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-   Statement stat = conn.createStatement();
-   stat.executeUpdate(sqlQueryBuilder.toString());
-   stat.close();
-   }
-
-   private static void insertDataToSQLTable() throws SQLException {
-   StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO 
books (id, title, author, price, qty) VALUES ");
-   sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah 
Teck', 11.11, 11),");
-   sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah 
Teck', 22.22, 22),");
-   sqlQueryBuilder.append("(1003, 'More Java for more dummies', 
'Mohammad Ali', 33.33, 33),");
-   sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 
44),");
-   sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin 
Jones', 55.55, 55)");
-
-   Statement stat = conn.createStatement();
-   stat.execute(sqlQueryBuilder.toString());
-   stat.close();
-   }
-
-   @AfterClass
-   public static void tearDownClass() {
-   cleanUpDerbyDatabases();
-   }
-
-   private static void cleanUpDerbyDatabases() {
-   try {
-   String dbURL = 
"jdbc:derby:memory:ebookshop;create=true";
-   Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-
-   conn = DriverManager.getConnection(dbURL);
-   Statement stat = conn.createStatement();
-   stat.executeUpdate("DROP TABLE books");
-   stat.close();
-   conn.close();
-   } catch (Exception e) {
-   e.printStackTrace();
-  

[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1941#discussion_r62833518
  
--- Diff: 
flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
 ---
@@ -15,17 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.api.java.io.jdbc;
+package org.apache.flink.api.java.io.jdbc.split;
 
-import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
 
-@SuppressWarnings("unused")
 /**
- * Utility class to disable derby logging
- */
-public class DerbyUtil {
-   public static final OutputStream DEV_NULL = new OutputStream() {
-   public void write(int b) {
-   }
-   };
+ * 
+ * This interface is used by the {@link JDBCInputFormat} to compute the 
list of parallel query to run (i.e. splits).
+ * Each query will be parameterized using a row of the matrix provided by 
each {@link ParameterValuesProvider} implementation
+ * 
+ * */
+public interface ParameterValuesProvider extends Serializable {
--- End diff --

The `ParameterValuesProvider` does not need to be `Serializable`. However, 
all values in the parameter values array must be, otherwise the JdbcInputFormat 
cannot be serialized. So the signature of `getParameterValues()` should be 
adapted to `public Serializable[][] getParameterValues()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1941#discussion_r62833327
  
--- Diff: 
flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
 ---
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc.split;
+
+/** 
+ * 
+ * This query generator assumes that the query to parameterize contains a 
BETWEEN constraint on a numeric column.
+ * The generated query set will be of size equal to the configured 
fetchSize (apart the last one range),
+ * ranging from the min value up to the max.
+ * 
+ * For example, if there's a table BOOKS with a numeric PK 
id, using a query like:
+ * 
+ *   SELECT * FROM BOOKS WHERE id BETWEEN ? AND ?
+ * 
+ *
+ * you can use this class to automatically generate the parameters of the 
BETWEEN clause,
+ * based on the passed constructor parameters.
+ * 
+ * */
+public class NumericBetweenParametersProvider implements 
ParameterValuesProvider {
+
+   private static final long serialVersionUID = 1L;
+   private long fetchSize;
+   private final long min;
+   private final long max;
+   
+   public NumericBetweenParametersProvider(long fetchSize, long min, long 
max) {
+   this.fetchSize = fetchSize;
+   this.min = min;
+   this.max = max;
+   }
+
+   @Override
+   public Object[][] getParameterValues(){
+   double maxEelemCount = (max - min) + 1;
--- End diff --

typo in var name


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1941#discussion_r62833230
  
--- Diff: 
flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 ---
@@ -95,32 +95,32 @@ private void establishConnection() throws SQLException, 
ClassNotFoundException {
}
}
 
-   private enum SupportedTypes {
-   BOOLEAN,
-   BYTE,
-   SHORT,
-   INTEGER,
-   LONG,
-   STRING,
-   FLOAT,
-   DOUBLE
-   }
-
/**
 * Adds a record to the prepared statement.
 * 
 * When this method is called, the output format is guaranteed to be 
opened.
+* 
+* WARNING: this may fail if the JDBC driver doesn't handle null 
correctly and no column types specified in the SqlRow
 *
 * @param tuple The records to add to the output.
 * @throws IOException Thrown, if the records could not be added due to 
an I/O problem.
 */
@Override
-   public void writeRecord(OUT tuple) throws IOException {
+   public void writeRecord(Row tuple) throws IOException {
try {
-   if (types == null) {
-   extractTypes(tuple);
+   for (int index = 0; index < tuple.productArity(); 
index++) {
+   if (tuple.productElement(index) == null && 
typesArray != null && typesArray.length > 0) {
+   if (typesArray.length == 
tuple.productArity()) {
+   upload.setNull(index + 1, 
typesArray[index]);
+   } else {
+   LOG.warn("Column SQL types 
array doesn't match arity of SqlRow! Check the passed array...");
+   }
+   } else {
+   //try generic set if no column type 
available
+   //WARNING: this may fail if the JDBC 
driver doesn't handle null correctly
+   upload.setObject(index + 1, 
tuple.productElement(index));
--- End diff --

Are there any drawbacks of using the generic `setObject()` method compared 
to the typed methods? If yes, we should use them if the types are specified.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1941#discussion_r62833149
  
--- Diff: 
flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 ---
@@ -95,32 +95,32 @@ private void establishConnection() throws SQLException, 
ClassNotFoundException {
}
}
 
-   private enum SupportedTypes {
-   BOOLEAN,
-   BYTE,
-   SHORT,
-   INTEGER,
-   LONG,
-   STRING,
-   FLOAT,
-   DOUBLE
-   }
-
/**
 * Adds a record to the prepared statement.
 * 
 * When this method is called, the output format is guaranteed to be 
opened.
+* 
+* WARNING: this may fail if the JDBC driver doesn't handle null 
correctly and no column types specified in the SqlRow
 *
 * @param tuple The records to add to the output.
 * @throws IOException Thrown, if the records could not be added due to 
an I/O problem.
 */
@Override
-   public void writeRecord(OUT tuple) throws IOException {
+   public void writeRecord(Row tuple) throws IOException {
try {
-   if (types == null) {
-   extractTypes(tuple);
+   for (int index = 0; index < tuple.productArity(); 
index++) {
+   if (tuple.productElement(index) == null && 
typesArray != null && typesArray.length > 0) {
+   if (typesArray.length == 
tuple.productArity()) {
--- End diff --

Move this check out of the loop. We do not need to check this for every 
attribute. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1941#discussion_r62833050
  
--- Diff: 
flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
 ---
@@ -352,6 +318,10 @@ public JDBCInputFormatBuilder 
setResultSetConcurrency(int resultSetConcurrency)
format.resultSetConcurrency = resultSetConcurrency;
return this;
}
+   public JDBCInputFormatBuilder 
setParametersProvider(ParameterValuesProvider parameterValuesProvider) {
--- End diff --

Add new line above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1941#discussion_r62833018
  
--- Diff: 
flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
 ---
@@ -314,6 +279,7 @@ public static JDBCInputFormatBuilder 
buildJDBCInputFormat() {
 
public JDBCInputFormatBuilder() {
this.format = new JDBCInputFormat();
+   //use the "Firehose cursor" (see 
http://jtds.sourceforge.net/resultSets.html)
--- End diff --

Can you explain what this comment is about?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1941#discussion_r62832955
  
--- Diff: 
flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
 ---
@@ -157,14 +216,25 @@ public boolean reachedEnd() throws IOException {
 * @throws java.io.IOException
 */
@Override
-   public OUT nextRecord(OUT tuple) throws IOException {
+   public Row nextRecord(Row row) throws IOException {
try {
-   resultSet.next();
-   if (columnTypes == null) {
-   extractTypes(tuple);
+   hasNext = resultSet.next();
+   if (!hasNext) {
+   return null;
+   }
+   try {
+   //This throws a NPE when the TypeInfo is not 
passed to the InputFormat,
+   //i.e. KryoSerializer used to generate the 
passed row
+   row.productArity();
+   } catch(NullPointerException npe) {
+   ResultSetMetaData resultSetMetaData = 
resultSet.getMetaData();
+   row = new 
Row(resultSetMetaData.getColumnCount());
+   LOG.warn("TypeInfo not provided to the 
InputFormat. Row cannot be reused.");
}
-   addValue(tuple);
-   return tuple;
+   for (int pos = 0; pos < row.productArity(); pos++) {
+   row.setField(pos, resultSet.getObject(pos + 1));
+   }
+   return row;
} catch (SQLException se) {
close();
--- End diff --

Let the data source task call `close()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1941#discussion_r62832846
  
--- Diff: 
flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
 ---
@@ -157,14 +216,25 @@ public boolean reachedEnd() throws IOException {
 * @throws java.io.IOException
 */
@Override
-   public OUT nextRecord(OUT tuple) throws IOException {
+   public Row nextRecord(Row row) throws IOException {
try {
-   resultSet.next();
-   if (columnTypes == null) {
-   extractTypes(tuple);
+   hasNext = resultSet.next();
+   if (!hasNext) {
+   return null;
+   }
+   try {
+   //This throws a NPE when the TypeInfo is not 
passed to the InputFormat,
+   //i.e. KryoSerializer used to generate the 
passed row
+   row.productArity();
--- End diff --

Why not check `row == null` instead of putting logic into the `catch` 
branch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1941#discussion_r62832789
  
--- Diff: 
flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
 ---
@@ -157,14 +216,25 @@ public boolean reachedEnd() throws IOException {
 * @throws java.io.IOException
 */
@Override
-   public OUT nextRecord(OUT tuple) throws IOException {
+   public Row nextRecord(Row row) throws IOException {
try {
-   resultSet.next();
-   if (columnTypes == null) {
-   extractTypes(tuple);
+   hasNext = resultSet.next();
--- End diff --

If `resultSet.next()` was called at the end of `open()` we should call it 
again at the end of `nextRecord()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1941#discussion_r62832683
  
--- Diff: 
flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
 ---
@@ -113,19 +192,7 @@ public void close() throws IOException {
try {
resultSet.close();
--- End diff --

check `resultSet == null` instead of catching NPE?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1941#discussion_r62832699
  
--- Diff: 
flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
 ---
@@ -113,19 +192,7 @@ public void close() throws IOException {
try {
resultSet.close();
} catch (SQLException se) {
-   LOG.info("Inputformat couldn't be closed - " + 
se.getMessage());
-   } catch (NullPointerException npe) {
-   }
-   try {
-   statement.close();
--- End diff --

null check


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1941#discussion_r62832588
  
--- Diff: 
flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
 ---
@@ -81,25 +134,51 @@ public void configure(Configuration parameters) {
 * @throws IOException
 */
@Override
-   public void open(InputSplit ignored) throws IOException {
+   public void open(InputSplit inputSplit) throws IOException {
+   hasNext = true;
--- End diff --

Call `resultSet.next()` at the end of `open()` and only set `hasNext = 
true` if the split actually contains a record.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1941#discussion_r62832479
  
--- Diff: 
flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
 ---
@@ -81,25 +134,51 @@ public void configure(Configuration parameters) {
 * @throws IOException
 */
@Override
-   public void open(InputSplit ignored) throws IOException {
+   public void open(InputSplit inputSplit) throws IOException {
+   hasNext = true;
try {
-   establishConnection();
-   statement = dbConn.createStatement(resultSetType, 
resultSetConcurrency);
-   resultSet = statement.executeQuery(query);
+   if (inputSplit != null && parameterValues != null) {
+   for (int i = 0; i < 
parameterValues[inputSplit.getSplitNumber()].length; i++) {
+   Object param = 
parameterValues[inputSplit.getSplitNumber()][i];
+   if (param instanceof String) {
+   statement.setString(i + 1, 
(String) param);
+   } else if (param instanceof Long) {
+   statement.setLong(i + 1, (Long) 
param);
+   } else if (param instanceof Integer) {
+   statement.setInt(i + 1, 
(Integer) param);
+   } else if (param instanceof Double) {
+   statement.setDouble(i + 1, 
(Double) param);
+   } else if (param instanceof Boolean) {
+   statement.setBoolean(i + 1, 
(Boolean) param);
+   } else if (param instanceof Float) {
+   statement.setFloat(i + 1, 
(Float) param);
+   } else if (param instanceof BigDecimal) 
{
+   statement.setBigDecimal(i + 1, 
(BigDecimal) param);
+   } else if (param instanceof Byte) {
+   statement.setByte(i + 1, (Byte) 
param);
+   } else if (param instanceof Short) {
+   statement.setShort(i + 1, 
(Short) param);
+   } else if (param instanceof Date) {
+   statement.setDate(i + 1, (Date) 
param);
+   } else if (param instanceof Time) {
+   statement.setTime(i + 1, (Time) 
param);
+   } else if (param instanceof Timestamp) {
+   statement.setTimestamp(i + 1, 
(Timestamp) param);
+   } else if (param instanceof Array) {
+   statement.setArray(i + 1, 
(Array) param);
+   } else {
+   //extends with other types if 
needed
+   throw new 
IllegalArgumentException("open() failed. Parameter " + i + " of type " + 
param.getClass() + " is not handled (yet)." );
+   }
+   }
+   if (LOG.isDebugEnabled()) {
+   LOG.debug(String.format("Executing '%s' 
with parameters %s", queryTemplate, 
Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()])));
+   }
+   }
+   resultSet = statement.executeQuery();
} catch (SQLException se) {
close();
--- End diff --

No need to call `close()`. This should be done by the object that manages 
the IF's life cycle.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1941#discussion_r62832266
  
--- Diff: 
flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
 ---
@@ -19,59 +19,112 @@
 package org.apache.flink.api.java.io.jdbc;
 
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Array;
 import java.sql.Connection;
+import java.sql.Date;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
-import java.sql.Statement;
-
-import org.apache.flink.api.common.io.NonParallelInput;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
 
 import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
+import org.apache.flink.api.table.Row;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.types.NullValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * InputFormat to read data from a database and generate tuples.
+ * InputFormat to read data from a database and generate Rows.
  * The InputFormat has to be configured using the supplied 
InputFormatBuilder.
  * 
- * @param 
- * @see Tuple
+ * In order to query the JDBC source in parallel, you need to provide a 
parameterized
+ * query template (i.e. a valid {@link PreparedStatement}) and a {@link 
ParameterValuesProvider} 
+ * which provides binding values for the query parameters.
+ * 
+ * @see Row
+ * @see ParameterValuesProvider
+ * @see PreparedStatement
  * @see DriverManager
  */
-public class JDBCInputFormat extends 
RichInputFormat implements NonParallelInput {
-   private static final long serialVersionUID = 1L;
+public class JDBCInputFormat extends RichInputFormat {
 
+   private static final long serialVersionUID = 1L;
private static final Logger LOG = 
LoggerFactory.getLogger(JDBCInputFormat.class);
 
private String username;
private String password;
private String drivername;
private String dbURL;
-   private String query;
+   private String queryTemplate;
private int resultSetType;
private int resultSetConcurrency;
 
private transient Connection dbConn;
-   private transient Statement statement;
+   private transient PreparedStatement statement;
private transient ResultSet resultSet;
-
-   private int[] columnTypes = null;
-
+   
+   private boolean hasNext = true;
+   private Object[][] parameterValues;
+   
public JDBCInputFormat() {
}
 
@Override
public void configure(Configuration parameters) {
+   //do nothing here
+   }
+   
+   @Override
+   public void openInputFormat() {
+   //called once per inputFormat (on open)
+   try {
+   Class.forName(drivername);
+   if (username == null) {
+   dbConn = DriverManager.getConnection(dbURL);
+   } else {
+   dbConn = DriverManager.getConnection(dbURL, 
username, password);
+   }
+   statement = dbConn.prepareStatement(queryTemplate, 
resultSetType, resultSetConcurrency);
+   } catch (SQLException se) {
+   throw new IllegalArgumentException("open() failed." + 
se.getMessage(), se);
+   } catch (ClassNotFoundException cnfe) {
+   throw new IllegalArgumentException("JDBC-Class not 
found. - " + cnfe.getMessage(), cnfe);
+   }
+   }
+   
+   @Override
+   public void closeInputFormat() {
+   //called once per inputFormat (on close)
+   try {
+   statement.close();
+   } catch (SQLException se) {
+   LOG.info("Inputformat Statement couldn't be closed - " 
+ se.getMessage());
+   } catch (NullPointerException npe) {
--- End diff --

Check for `statement == null` instead of catching a NPE.


---
If your project is set up for it, you can reply to this email and have 

[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1941#discussion_r62832299
  
--- Diff: 
flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
 ---
@@ -19,59 +19,112 @@
 package org.apache.flink.api.java.io.jdbc;
 
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Array;
 import java.sql.Connection;
+import java.sql.Date;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
-import java.sql.Statement;
-
-import org.apache.flink.api.common.io.NonParallelInput;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
 
 import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
+import org.apache.flink.api.table.Row;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.types.NullValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * InputFormat to read data from a database and generate tuples.
+ * InputFormat to read data from a database and generate Rows.
  * The InputFormat has to be configured using the supplied 
InputFormatBuilder.
  * 
- * @param 
- * @see Tuple
+ * In order to query the JDBC source in parallel, you need to provide a 
parameterized
+ * query template (i.e. a valid {@link PreparedStatement}) and a {@link 
ParameterValuesProvider} 
+ * which provides binding values for the query parameters.
+ * 
+ * @see Row
+ * @see ParameterValuesProvider
+ * @see PreparedStatement
  * @see DriverManager
  */
-public class JDBCInputFormat extends 
RichInputFormat implements NonParallelInput {
-   private static final long serialVersionUID = 1L;
+public class JDBCInputFormat extends RichInputFormat {
 
+   private static final long serialVersionUID = 1L;
private static final Logger LOG = 
LoggerFactory.getLogger(JDBCInputFormat.class);
 
private String username;
private String password;
private String drivername;
private String dbURL;
-   private String query;
+   private String queryTemplate;
private int resultSetType;
private int resultSetConcurrency;
 
private transient Connection dbConn;
-   private transient Statement statement;
+   private transient PreparedStatement statement;
private transient ResultSet resultSet;
-
-   private int[] columnTypes = null;
-
+   
+   private boolean hasNext = true;
+   private Object[][] parameterValues;
+   
public JDBCInputFormat() {
}
 
@Override
public void configure(Configuration parameters) {
+   //do nothing here
+   }
+   
+   @Override
+   public void openInputFormat() {
+   //called once per inputFormat (on open)
+   try {
+   Class.forName(drivername);
+   if (username == null) {
+   dbConn = DriverManager.getConnection(dbURL);
+   } else {
+   dbConn = DriverManager.getConnection(dbURL, 
username, password);
+   }
+   statement = dbConn.prepareStatement(queryTemplate, 
resultSetType, resultSetConcurrency);
+   } catch (SQLException se) {
+   throw new IllegalArgumentException("open() failed." + 
se.getMessage(), se);
+   } catch (ClassNotFoundException cnfe) {
+   throw new IllegalArgumentException("JDBC-Class not 
found. - " + cnfe.getMessage(), cnfe);
+   }
+   }
+   
+   @Override
+   public void closeInputFormat() {
+   //called once per inputFormat (on close)
+   try {
+   statement.close();
+   } catch (SQLException se) {
+   LOG.info("Inputformat Statement couldn't be closed - " 
+ se.getMessage());
+   } catch (NullPointerException npe) {
+   } finally {
+   statement = null;
+   }
+   
+   try {
+   

[GitHub] flink pull request: Flink 3750 fixed

2016-05-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1941#discussion_r62832218
  
--- Diff: 
flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
 ---
@@ -81,25 +134,51 @@ public void configure(Configuration parameters) {
 * @throws IOException
 */
@Override
-   public void open(InputSplit ignored) throws IOException {
+   public void open(InputSplit inputSplit) throws IOException {
--- End diff --

JavaDocs need to be updated for new parameter name


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >