[jira] [Commented] (FLINK-5266) Eagerly project unused fields when selecting aggregation fields

2016-12-05 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15724703#comment-15724703
 ] 

Fabian Hueske commented on FLINK-5266:
--

This looks very similar to FLINK-5220 and might be a duplicate.
Can you check if the pending PR for FLINK-5220 
(https://github.com/apache/flink/pull/2923) solves the issue?

Thanks, Fabian

> Eagerly project unused fields when selecting aggregation fields
> ---
>
> Key: FLINK-5266
> URL: https://issues.apache.org/jira/browse/FLINK-5266
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> When we call table's {{select}} method and if it contains some aggregations, 
> we will project fields after the aggregation. Would be better to project 
> unused fields before the aggregation, and can furthermore leave the 
> opportunity to push the project into scan.
> For example, the current logical plan of a simple query:
> {code}
> table.select('a.sum as 's, 'a.max)
> {code}
> is
> {code}
> LogicalProject(s=[$0], TMP_2=[$1])
>   LogicalAggregate(group=[{}], TMP_0=[SUM($5)], TMP_1=[MAX($5)])
> LogicalTableScan(table=[[supplier]])
> {code}
> Would be better if we can project unused fields right after scan, and looks 
> like this:
> {code}
> LogicalProject(s=[$0], EXPR$1=[$0])
>   LogicalAggregate(group=[{}], EXPR$1=[SUM($0)])
> LogicalProject(a=[$5])
>   LogicalTableScan(table=[[supplier]])
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5221) Checkpointed workless in Window Operator

2016-12-05 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15724302#comment-15724302
 ] 

shijinkui commented on FLINK-5221:
--

This problem is Java low level API used Scala high level API's class, but Java 
API haven't dependency Scala API. They are all added into same classpath, 
otherwise it will have a class not found runtime exception.

We can add new interface function `getUserFunction` for return the useFunction 
object.

In general, Flink Java API and Scala API,which one more suitable to as a low 
level API. I think scala tait is better than Java interface.
[~till.rohrmann], do we have plan to have an unified API in the future, for 
example 2.0?

> Checkpointed workless in Window Operator
> 
>
> Key: FLINK-5221
> URL: https://issues.apache.org/jira/browse/FLINK-5221
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.1.3
> Environment: SUSE
>Reporter: Syinchwun Leo
>  Labels: windows
> Fix For: 1.2.0
>
>   Original Estimate: 12h
>  Remaining Estimate: 12h
>
> When window OPERATOR making checkpoint like this:
> class WindowStatistic extends WindowFunction[Event, Int, Tuple, TimeWindow] 
> with Checkpointed[Option[List[Event]]] {
> override def appley() 
> override def snapshotState()...
> override def restoreState()
> }
> Window Operator couldn't invoke user defined function "snapshotState()". In 
> debug model, line 123 in AbstractUdfStreamOperator.java returns false and 
> can't make user defined state when checking the window whether is a 
> Checkpointed instance. I think there is something wrong in userFunction var, 
> it's a ScalaWindowFunctionWrapper object and it couldn't reflect if the user 
> defined window extend Checkpointed Interface. Actually, the user defined 
> window is kept in "func" var of userFunction. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5247) Setting allowedLateness to a non-zero value should throw exception for processing-time windows

2016-12-05 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15724298#comment-15724298
 ] 

Aljoscha Krettek commented on FLINK-5247:
-

I see. Thanks for looking into this!

I actually have a fix for this (ignore allowedLateness for processing time) 
buried in this PR: 
https://github.com/apache/flink/pull/2572/files#diff-408a499e1a35840c52e29b7ccab866b1R525
 (You can look at WindowOperator.cleanupTime()).

I think it would be better to ignore setting an allowed lateness for processing 
time. But you're right that the checks should be fixed.

> Setting allowedLateness to a non-zero value should throw exception for 
> processing-time windows
> --
>
> Key: FLINK-5247
> URL: https://issues.apache.org/jira/browse/FLINK-5247
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0, 1.1.1, 1.1.2, 1.1.3
>Reporter: Rohit Agarwal
>
> Related to FLINK-3714 and FLINK-4239



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5221) Checkpointed workless in Window Operator

2016-12-05 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15724281#comment-15724281
 ] 

Aljoscha Krettek commented on FLINK-5221:
-

FLINK-5250 would be the general fix for this.

> Checkpointed workless in Window Operator
> 
>
> Key: FLINK-5221
> URL: https://issues.apache.org/jira/browse/FLINK-5221
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.1.3
> Environment: SUSE
>Reporter: Syinchwun Leo
>  Labels: windows
> Fix For: 1.2.0
>
>   Original Estimate: 12h
>  Remaining Estimate: 12h
>
> When window OPERATOR making checkpoint like this:
> class WindowStatistic extends WindowFunction[Event, Int, Tuple, TimeWindow] 
> with Checkpointed[Option[List[Event]]] {
> override def appley() 
> override def snapshotState()...
> override def restoreState()
> }
> Window Operator couldn't invoke user defined function "snapshotState()". In 
> debug model, line 123 in AbstractUdfStreamOperator.java returns false and 
> can't make user defined state when checking the window whether is a 
> Checkpointed instance. I think there is something wrong in userFunction var, 
> it's a ScalaWindowFunctionWrapper object and it couldn't reflect if the user 
> defined window extend Checkpointed Interface. Actually, the user defined 
> window is kept in "func" var of userFunction. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15724190#comment-15724190
 ] 

ASF GitHub Bot commented on FLINK-4469:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/2653
  
Hi @fhueske @twalthr ,  I have updated the PR, please review it again when 
you are available.


> Add support for user defined table function in Table API & SQL
> --
>
> Key: FLINK-4469
> URL: https://issues.apache.org/jira/browse/FLINK-4469
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Blocker
>
> Normal user-defined functions, such as concat(), take in a single input row 
> and output a single output row. In contrast, table-generating functions 
> transform a single input row to multiple output rows. It is very useful in 
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function. 
> NOTE: 
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of 
> Java type erasure, we can’t extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method 
> to call according to parameter types and number.
> {code}
> public class Word {
>   public String word;
>   public Integer length;
> }
> public class SplitStringUDTF extends UDTF {
> public Iterable eval(String str) {
> if (str != null) {
> for (String s : str.split(",")) {
> collect(new Word(s, s.length()));
> }
> }
> }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS 
> t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to “w” and “l”
> table.crossApply("split(c) as (w, l)")
>  .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
>  .select("a, b, word, length")
> // in Scala Table API
> val split = new SplitStringUDTF()
> table.crossApply(split('c) as ('w, 'l))
>  .select('a, 'b, 'w, 'l)
> // outerApply for outer join to a UDTF
> table.outerApply(split('c))
>  .select('a, 'b, 'word, 'length)
> {code}
> See [1] for more information about UDTF design.
> [1] 
> https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2653: [FLINK-4469] [table] Add support for user defined table f...

2016-12-05 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/2653
  
Hi @fhueske @twalthr ,  I have updated the PR, please review it again when 
you are available.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-4930) Implement FLIP-6 YARN client

2016-12-05 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu reassigned FLINK-4930:
---

Assignee: shuai.xu

> Implement FLIP-6 YARN client
> 
>
> Key: FLINK-4930
> URL: https://issues.apache.org/jira/browse/FLINK-4930
> Project: Flink
>  Issue Type: Sub-task
>  Components: YARN
> Environment: {{flip-6}} feature branch
>Reporter: Stephan Ewen
>Assignee: shuai.xu
>
> The FLIP-6 YARN client can follow parts of the existing YARN client.
> The main difference is that it does not wait for the cluster to be fully 
> started and for all TaskManagers to register. It simply submits 
>   - Set up all configurations and environment variables
>   - Set up the resources: Flink jar, utility jars (logging), user jar
>   - Set up attached tokens / certificates
>   - Submit the Yarn application
>   - Listen for leader / attempt to connect to the JobManager to subscribe to 
> updates
>   - Integration with the Flink CLI (command line interface)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15724057#comment-15724057
 ] 

ASF GitHub Bot commented on FLINK-3848:
---

Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/2810
  
Hi @tonycox , i want to appologize first for opening another "project 
pushdown" issue when this one is still in active. We actually come up with some 
whole new idea and designs compare to the original design here, so we think 
maybe it will be easier to open another pull request and let's discuss with 
some codes. 

But i noticed that this PR is becoming more similar with our design now 
(only some slightly differences), even some core codes are copied from our pull 
request. I think we should work out a plan to merge our work together. 

A litter opinion here: I think it's pointless to let CsvTableSource be a 
ProjecableTableSource since it simply cannot avoid reading extra fields in the 
first place. 



> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2810: [FLINK-3848] Add ProjectableTableSource interface and tra...

2016-12-05 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/2810
  
Hi @tonycox , i want to appologize first for opening another "project 
pushdown" issue when this one is still in active. We actually come up with some 
whole new idea and designs compare to the original design here, so we think 
maybe it will be easier to open another pull request and let's discuss with 
some codes. 

But i noticed that this PR is becoming more similar with our design now 
(only some slightly differences), even some core codes are copied from our pull 
request. I think we should work out a plan to merge our work together. 

A litter opinion here: I think it's pointless to let CsvTableSource be a 
ProjecableTableSource since it simply cannot avoid reading extra fields in the 
first place. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5266) Eagerly project unused fields when selecting aggregation fields

2016-12-05 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-5266:
--
Component/s: Table API & SQL

> Eagerly project unused fields when selecting aggregation fields
> ---
>
> Key: FLINK-5266
> URL: https://issues.apache.org/jira/browse/FLINK-5266
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> When we call table's {{select}} method and if it contains some aggregations, 
> we will project fields after the aggregation. Would be better to project 
> unused fields before the aggregation, and can furthermore leave the 
> opportunity to push the project into scan.
> For example, the current logical plan of a simple query:
> {code}
> table.select('a.sum as 's, 'a.max)
> {code}
> is
> {code}
> LogicalProject(s=[$0], TMP_2=[$1])
>   LogicalAggregate(group=[{}], TMP_0=[SUM($5)], TMP_1=[MAX($5)])
> LogicalTableScan(table=[[supplier]])
> {code}
> Would be better if we can project unused fields right after scan, and looks 
> like this:
> {code}
> LogicalProject(s=[$0], EXPR$1=[$0])
>   LogicalAggregate(group=[{}], EXPR$1=[SUM($0)])
> LogicalProject(a=[$5])
>   LogicalTableScan(table=[[supplier]])
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5266) Eagerly project unused fields when selecting aggregation fields

2016-12-05 Thread Kurt Young (JIRA)
Kurt Young created FLINK-5266:
-

 Summary: Eagerly project unused fields when selecting aggregation 
fields
 Key: FLINK-5266
 URL: https://issues.apache.org/jira/browse/FLINK-5266
 Project: Flink
  Issue Type: Improvement
Reporter: Kurt Young
Assignee: Kurt Young


When we call table's {{select}} method and if it contains some aggregations, we 
will project fields after the aggregation. Would be better to project unused 
fields before the aggregation, and can furthermore leave the opportunity to 
push the project into scan.

For example, the current logical plan of a simple query:
{code}
table.select('a.sum as 's, 'a.max)
{code}
is
{code}
LogicalProject(s=[$0], TMP_2=[$1])
  LogicalAggregate(group=[{}], TMP_0=[SUM($5)], TMP_1=[MAX($5)])
LogicalTableScan(table=[[supplier]])
{code}

Would be better if we can project unused fields right after scan, and looks 
like this:
{code}
LogicalProject(s=[$0], EXPR$1=[$0])
  LogicalAggregate(group=[{}], EXPR$1=[SUM($0)])
LogicalProject(a=[$5])
  LogicalTableScan(table=[[supplier]])
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5002) Lack of synchronization in LocalBufferPool#getNumberOfUsedBuffers

2016-12-05 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723962#comment-15723962
 ] 

Ted Yu commented on FLINK-5002:
---

Can a committer review, please ?

> Lack of synchronization in LocalBufferPool#getNumberOfUsedBuffers
> -
>
> Key: FLINK-5002
> URL: https://issues.apache.org/jira/browse/FLINK-5002
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Roman Maier
>Priority: Minor
>  Labels: easyfix, starter
>
> {code}
>   public int getNumberOfUsedBuffers() {
> return numberOfRequestedMemorySegments - availableMemorySegments.size();
>   }
> {code}
> Access to availableMemorySegments should be protected with proper 
> synchronization as other methods do.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2949: [FLINK-5257] [table] display optimized logical pla...

2016-12-05 Thread KurtYoung
GitHub user KurtYoung opened a pull request:

https://github.com/apache/flink/pull/2949

[FLINK-5257] [table] display optimized logical plan when explaining table



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/KurtYoung/flink flink-5257

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2949.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2949


commit d7a0e3300ad9a432ef3994702b7c81f2701521a4
Author: Kurt Young 
Date:   2016-12-06T01:23:22Z

[FLINK-5257] [table] display optimized logical plan when explaining table




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5257) Display optimized logical plan when explaining table

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723954#comment-15723954
 ] 

ASF GitHub Bot commented on FLINK-5257:
---

GitHub user KurtYoung opened a pull request:

https://github.com/apache/flink/pull/2949

[FLINK-5257] [table] display optimized logical plan when explaining table



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/KurtYoung/flink flink-5257

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2949.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2949


commit d7a0e3300ad9a432ef3994702b7c81f2701521a4
Author: Kurt Young 
Date:   2016-12-06T01:23:22Z

[FLINK-5257] [table] display optimized logical plan when explaining table




> Display optimized logical plan when explaining table
> 
>
> Key: FLINK-5257
> URL: https://issues.apache.org/jira/browse/FLINK-5257
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>Priority: Minor
>
> Currently when we use {{BatchTableEnvironment}} or {{StreamTableEnvironment}} 
> to explain a table, it only print out the "Abstract Syntax Tree" and 
> "Physical Execution Plan". Would be nice to have the "Optimized Logical Plan" 
> also printed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5091) Formalize the AppMaster environment for docker compability

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723724#comment-15723724
 ] 

ASF GitHub Bot commented on FLINK-5091:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2915


> Formalize the AppMaster environment for docker compability
> --
>
> Key: FLINK-5091
> URL: https://issues.apache.org/jira/browse/FLINK-5091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
> Fix For: 1.2.0
>
>
> For scenarios where the AppMaster is launched from a docker image, it would 
> be ideal to use the installed Flink rather than rely on a special file layout 
> in the sandbox directory.
> This is related to DCOS integration, which (in 1.2) will launch the AppMaster 
> via Marathon (as a top-level DCOS service).  The existing code assumed that 
> only the dispatcher (coming in 1.3) would launch the AppMaster.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2915: [FLINK-5091] Formalize the Mesos AppMaster environ...

2016-12-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2915


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5247) Setting allowedLateness to a non-zero value should throw exception for processing-time windows

2016-12-05 Thread Rohit Agarwal (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723552#comment-15723552
 ] 

Rohit Agarwal commented on FLINK-5247:
--

The existing code already had provisions to throw exceptions when 
allowedLateness is set to non-zero value for processing time windows.
{code}
throw new IllegalArgumentException("Setting the allowed 
lateness is only valid for event-time windows.");
{code}
I created this JIRA because that existing code was not working as intended: 
https://github.com/apache/flink/pull/2929/files

Currently, if you set allowedLateness to non-zero value for processing-time 
windows, flink would update allowedLateness to that value. (This probably 
results in delays in window purging.)

We can make allowedLateness method a no-op for processing-time windows, I 
didn't do that because looking at the existing code that didn't seem to be the 
intention.

> Setting allowedLateness to a non-zero value should throw exception for 
> processing-time windows
> --
>
> Key: FLINK-5247
> URL: https://issues.apache.org/jira/browse/FLINK-5247
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0, 1.1.1, 1.1.2, 1.1.3
>Reporter: Rohit Agarwal
>
> Related to FLINK-3714 and FLINK-4239



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5265) Introduce state handle replication mode for CheckpointCoordinator

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723325#comment-15723325
 ] 

ASF GitHub Bot commented on FLINK-5265:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/2948
  
CC @aljoscha @StephanEwen 


> Introduce state handle replication mode for CheckpointCoordinator
> -
>
> Key: FLINK-5265
> URL: https://issues.apache.org/jira/browse/FLINK-5265
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Currently, the {{CheckpointCoordinator}} only supports repartitioning of 
> {{OperatorStateHandle}}s based on a split-and-distribute strategy. For future 
> state types, such as broadcast or union state, we need a different 
> repartitioning method that allows for replicating state handles to all 
> subtasks.
> This is the first step on the way to implementing broadcast and union states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2948: [FLINK-5265] Introduce state handle replication mode for ...

2016-12-05 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/2948
  
CC @aljoscha @StephanEwen 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2821) Change Akka configuration to allow accessing actors from different URLs

2016-12-05 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723319#comment-15723319
 ] 

Maximilian Michels commented on FLINK-2821:
---

Thanks for testing! The PR assumes that {{jobmanager.rpc.address}} is 
resolvable in order to verify that it is a valid address. It doesn't use the 
resolved address. I understand that this is an assumption we have to move away 
from because there may be environments where the specified hostname is not 
resolvable from within the JobManager. There are some tests cases which rely on 
this behavior which I'll have to adapt.

> Change Akka configuration to allow accessing actors from different URLs
> ---
>
> Key: FLINK-2821
> URL: https://issues.apache.org/jira/browse/FLINK-2821
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Reporter: Robert Metzger
>Assignee: Maximilian Michels
>
> Akka expects the actor's URL to be exactly matching.
> As pointed out here, cases where users were complaining about this: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-trying-to-access-JM-through-proxy-td3018.html
>   - Proxy routing (as described here, send to the proxy URL, receiver 
> recognizes only original URL)
>   - Using hostname / IP interchangeably does not work (we solved this by 
> always putting IP addresses into URLs, never hostnames)
>   - Binding to multiple interfaces (any local 0.0.0.0) does not work. Still 
> no solution to that (but seems not too much of a restriction)
> I am aware that this is not possible due to Akka, so it is actually not a 
> Flink bug. But I think we should track the resolution of the issue here 
> anyways because its affecting our user's satisfaction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2821) Change Akka configuration to allow accessing actors from different URLs

2016-12-05 Thread Philipp von dem Bussche (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723269#comment-15723269
 ] 

Philipp von dem Bussche commented on FLINK-2821:


Hi [~mxm], I wanted to give this a try but I am not sure if I am testing this 
correctly.
Do I just have to set jobmanager.rpc.address but to the hostname that will be 
used for access from outside ?
I tried to use a name that is not resolvable on the host itself and that fails 
but this is on my local docker environment and this should be different as soon 
as I move this to rancher.
Thanks

> Change Akka configuration to allow accessing actors from different URLs
> ---
>
> Key: FLINK-2821
> URL: https://issues.apache.org/jira/browse/FLINK-2821
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Reporter: Robert Metzger
>Assignee: Maximilian Michels
>
> Akka expects the actor's URL to be exactly matching.
> As pointed out here, cases where users were complaining about this: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-trying-to-access-JM-through-proxy-td3018.html
>   - Proxy routing (as described here, send to the proxy URL, receiver 
> recognizes only original URL)
>   - Using hostname / IP interchangeably does not work (we solved this by 
> always putting IP addresses into URLs, never hostnames)
>   - Binding to multiple interfaces (any local 0.0.0.0) does not work. Still 
> no solution to that (but seems not too much of a restriction)
> I am aware that this is not possible due to Akka, so it is actually not a 
> Flink bug. But I think we should track the resolution of the issue here 
> anyways because its affecting our user's satisfaction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5265) Introduce state handle replication mode for CheckpointCoordinator

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723255#comment-15723255
 ] 

ASF GitHub Bot commented on FLINK-5265:
---

GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/2948

[FLINK-5265] Introduce state handle replication mode for 
CheckpointCoordinator

Currently, the ``CheckpointCoordinator`` only supports repartitioning of 
``OperatorStateHandle``s based on a split-and-distribute strategy. For future 
state types, such as broadcast or union state, we need a different 
repartitioning method that allows for replicating state handles to all subtasks.

This functionality is introduced with this PR and represents the first step 
on the way to implementing broadcast and union states.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink broadcast-op-state

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2948.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2948






> Introduce state handle replication mode for CheckpointCoordinator
> -
>
> Key: FLINK-5265
> URL: https://issues.apache.org/jira/browse/FLINK-5265
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Currently, the {{CheckpointCoordinator}} only supports repartitioning of 
> {{OperatorStateHandle}}s based on a split-and-distribute strategy. For future 
> state types, such as broadcast or union state, we need a different 
> repartitioning method that allows for replicating state handles to all 
> subtasks.
> This is the first step on the way to implementing broadcast and union states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2948: [FLINK-5265] Introduce state handle replication mo...

2016-12-05 Thread StefanRRichter
GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/2948

[FLINK-5265] Introduce state handle replication mode for 
CheckpointCoordinator

Currently, the ``CheckpointCoordinator`` only supports repartitioning of 
``OperatorStateHandle``s based on a split-and-distribute strategy. For future 
state types, such as broadcast or union state, we need a different 
repartitioning method that allows for replicating state handles to all subtasks.

This functionality is introduced with this PR and represents the first step 
on the way to implementing broadcast and union states.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink broadcast-op-state

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2948.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2948






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-5265) Introduce state handle replication mode for CheckpointCoordinator

2016-12-05 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter reassigned FLINK-5265:
-

Assignee: Stefan Richter

> Introduce state handle replication mode for CheckpointCoordinator
> -
>
> Key: FLINK-5265
> URL: https://issues.apache.org/jira/browse/FLINK-5265
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Currently, the {{CheckpointCoordinator}} only supports repartitioning of 
> {{OperatorStateHandle}}s based on a split-and-distribute strategy. For future 
> state types, such as broadcast or union state, we need a different 
> repartitioning method that allows for replicating state handles to all 
> subtasks.
> This is the first step on the way to implementing broadcast and union states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3921) StringParser not specifying encoding to use

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723248#comment-15723248
 ] 

ASF GitHub Bot commented on FLINK-3921:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2901#discussion_r90943695
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 ---
@@ -182,8 +186,37 @@ protected DelimitedInputFormat(Path filePath, 
Configuration configuration) {
}
loadConfigParameters(configuration);
}
-   
-   
+
+   /**
+* Get the character set used for the row delimiter, field delimiter,
+* comment string, and {@link FieldParser}.
+*
+* @return the charset
+*/
+   @PublicEvolving
+   public Charset getCharset() {
+   if (this.charset == null) {
+   this.charset = Charset.forName(charsetName);
+   }
+   return this.charset;
+   }
+
+   /**
+* Set the name of the character set used for the row delimiter, field
+* delimiter, comment string, and {@link FieldParser}.
+*
+* The delimeters and comment string are interpreted when set. Each
--- End diff --

Can we change the encoding of the current delimiter using the old charset 
and the new charset?
That would also allow to set a new charset without the need to set `\n` 
again to have a newline delimiter.


> StringParser not specifying encoding to use
> ---
>
> Key: FLINK-3921
> URL: https://issues.apache.org/jira/browse/FLINK-3921
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.3
>Reporter: Tatu Saloranta
>Assignee: Rekha Joshi
>Priority: Trivial
>
> Class `flink.types.parser.StringParser` has javadocs indicating that contents 
> are expected to be Ascii, similar to `StringValueParser`. That makes sense, 
> but when constructing actual instance, no encoding is specified; on line 66 
> f.ex:
>this.result = new String(bytes, startPos+1, i - startPos - 2);
> which leads to using whatever default platform encoding is. If contents 
> really are always Ascii (would not count on that as parser is used from CSV 
> reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues.
> So I think that encoding should be explicitly specified, whatever is to be 
> used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 
> or even ISO-8859-1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3921) StringParser not specifying encoding to use

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723246#comment-15723246
 ] 

ASF GitHub Bot commented on FLINK-3921:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2901#discussion_r90944828
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 ---
@@ -182,8 +186,37 @@ protected DelimitedInputFormat(Path filePath, 
Configuration configuration) {
}
loadConfigParameters(configuration);
}
-   
-   
+
+   /**
+* Get the character set used for the row delimiter, field delimiter,
+* comment string, and {@link FieldParser}.
+*
+* @return the charset
+*/
+   @PublicEvolving
+   public Charset getCharset() {
+   if (this.charset == null) {
+   this.charset = Charset.forName(charsetName);
+   }
+   return this.charset;
+   }
+
+   /**
+* Set the name of the character set used for the row delimiter, field
+* delimiter, comment string, and {@link FieldParser}.
+*
+* The delimeters and comment string are interpreted when set. Each
--- End diff --

Btw., the `DelimitedInputFormat` does not have a comment string or field 
parsers.


> StringParser not specifying encoding to use
> ---
>
> Key: FLINK-3921
> URL: https://issues.apache.org/jira/browse/FLINK-3921
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.3
>Reporter: Tatu Saloranta
>Assignee: Rekha Joshi
>Priority: Trivial
>
> Class `flink.types.parser.StringParser` has javadocs indicating that contents 
> are expected to be Ascii, similar to `StringValueParser`. That makes sense, 
> but when constructing actual instance, no encoding is specified; on line 66 
> f.ex:
>this.result = new String(bytes, startPos+1, i - startPos - 2);
> which leads to using whatever default platform encoding is. If contents 
> really are always Ascii (would not count on that as parser is used from CSV 
> reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues.
> So I think that encoding should be explicitly specified, whatever is to be 
> used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 
> or even ISO-8859-1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723242#comment-15723242
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2564
  
Hi @greghogan , thank you for your review.
I'll try to fix them in the next couple of days.

Best regards,
Ivan.


> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3921) StringParser not specifying encoding to use

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723245#comment-15723245
 ] 

ASF GitHub Bot commented on FLINK-3921:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2901#discussion_r90944589
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 ---
@@ -182,8 +186,37 @@ protected DelimitedInputFormat(Path filePath, 
Configuration configuration) {
}
loadConfigParameters(configuration);
}
-   
-   
+
+   /**
+* Get the character set used for the row delimiter, field delimiter,
+* comment string, and {@link FieldParser}.
+*
+* @return the charset
+*/
+   @PublicEvolving
+   public Charset getCharset() {
+   if (this.charset == null) {
+   this.charset = Charset.forName(charsetName);
+   }
+   return this.charset;
+   }
+
+   /**
+* Set the name of the character set used for the row delimiter, field
+* delimiter, comment string, and {@link FieldParser}.
+*
+* The delimeters and comment string are interpreted when set. Each
--- End diff --

`GenericCsvInputFormat` would need to override this method and call 
`super.setCharset()` to propagate the change.

What do you think @greghogan?


> StringParser not specifying encoding to use
> ---
>
> Key: FLINK-3921
> URL: https://issues.apache.org/jira/browse/FLINK-3921
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.3
>Reporter: Tatu Saloranta
>Assignee: Rekha Joshi
>Priority: Trivial
>
> Class `flink.types.parser.StringParser` has javadocs indicating that contents 
> are expected to be Ascii, similar to `StringValueParser`. That makes sense, 
> but when constructing actual instance, no encoding is specified; on line 66 
> f.ex:
>this.result = new String(bytes, startPos+1, i - startPos - 2);
> which leads to using whatever default platform encoding is. If contents 
> really are always Ascii (would not count on that as parser is used from CSV 
> reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues.
> So I think that encoding should be explicitly specified, whatever is to be 
> used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 
> or even ISO-8859-1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3921) StringParser not specifying encoding to use

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723247#comment-15723247
 ] 

ASF GitHub Bot commented on FLINK-3921:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2901#discussion_r90945295
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
 ---
@@ -200,6 +200,30 @@ public void testReadCustomDelimiter() {
}
}
 
+   @Test
+   public void testReadCustomDelimiterWithCharset() throws IOException {
--- End diff --

This tests the `DelimitedInputFormat` with default charset. 
Can you add a test with a different charset as well?


> StringParser not specifying encoding to use
> ---
>
> Key: FLINK-3921
> URL: https://issues.apache.org/jira/browse/FLINK-3921
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.3
>Reporter: Tatu Saloranta
>Assignee: Rekha Joshi
>Priority: Trivial
>
> Class `flink.types.parser.StringParser` has javadocs indicating that contents 
> are expected to be Ascii, similar to `StringValueParser`. That makes sense, 
> but when constructing actual instance, no encoding is specified; on line 66 
> f.ex:
>this.result = new String(bytes, startPos+1, i - startPos - 2);
> which leads to using whatever default platform encoding is. If contents 
> really are always Ascii (would not count on that as parser is used from CSV 
> reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues.
> So I think that encoding should be explicitly specified, whatever is to be 
> used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 
> or even ISO-8859-1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2901: [FLINK-3921] StringParser encoding

2016-12-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2901#discussion_r90944589
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 ---
@@ -182,8 +186,37 @@ protected DelimitedInputFormat(Path filePath, 
Configuration configuration) {
}
loadConfigParameters(configuration);
}
-   
-   
+
+   /**
+* Get the character set used for the row delimiter, field delimiter,
+* comment string, and {@link FieldParser}.
+*
+* @return the charset
+*/
+   @PublicEvolving
+   public Charset getCharset() {
+   if (this.charset == null) {
+   this.charset = Charset.forName(charsetName);
+   }
+   return this.charset;
+   }
+
+   /**
+* Set the name of the character set used for the row delimiter, field
+* delimiter, comment string, and {@link FieldParser}.
+*
+* The delimeters and comment string are interpreted when set. Each
--- End diff --

`GenericCsvInputFormat` would need to override this method and call 
`super.setCharset()` to propagate the change.

What do you think @greghogan?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

2016-12-05 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2564
  
Hi @greghogan , thank you for your review.
I'll try to fix them in the next couple of days.

Best regards,
Ivan.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2901: [FLINK-3921] StringParser encoding

2016-12-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2901#discussion_r90944828
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 ---
@@ -182,8 +186,37 @@ protected DelimitedInputFormat(Path filePath, 
Configuration configuration) {
}
loadConfigParameters(configuration);
}
-   
-   
+
+   /**
+* Get the character set used for the row delimiter, field delimiter,
+* comment string, and {@link FieldParser}.
+*
+* @return the charset
+*/
+   @PublicEvolving
+   public Charset getCharset() {
+   if (this.charset == null) {
+   this.charset = Charset.forName(charsetName);
+   }
+   return this.charset;
+   }
+
+   /**
+* Set the name of the character set used for the row delimiter, field
+* delimiter, comment string, and {@link FieldParser}.
+*
+* The delimeters and comment string are interpreted when set. Each
--- End diff --

Btw., the `DelimitedInputFormat` does not have a comment string or field 
parsers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2901: [FLINK-3921] StringParser encoding

2016-12-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2901#discussion_r90945295
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
 ---
@@ -200,6 +200,30 @@ public void testReadCustomDelimiter() {
}
}
 
+   @Test
+   public void testReadCustomDelimiterWithCharset() throws IOException {
--- End diff --

This tests the `DelimitedInputFormat` with default charset. 
Can you add a test with a different charset as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2901: [FLINK-3921] StringParser encoding

2016-12-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2901#discussion_r90943695
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 ---
@@ -182,8 +186,37 @@ protected DelimitedInputFormat(Path filePath, 
Configuration configuration) {
}
loadConfigParameters(configuration);
}
-   
-   
+
+   /**
+* Get the character set used for the row delimiter, field delimiter,
+* comment string, and {@link FieldParser}.
+*
+* @return the charset
+*/
+   @PublicEvolving
+   public Charset getCharset() {
+   if (this.charset == null) {
+   this.charset = Charset.forName(charsetName);
+   }
+   return this.charset;
+   }
+
+   /**
+* Set the name of the character set used for the row delimiter, field
+* delimiter, comment string, and {@link FieldParser}.
+*
+* The delimeters and comment string are interpreted when set. Each
--- End diff --

Can we change the encoding of the current delimiter using the old charset 
and the new charset?
That would also allow to set a new charset without the need to set `\n` 
again to have a newline delimiter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5265) Introduce state handle replication mode for CheckpointCoordinator

2016-12-05 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-5265:
-

 Summary: Introduce state handle replication mode for 
CheckpointCoordinator
 Key: FLINK-5265
 URL: https://issues.apache.org/jira/browse/FLINK-5265
 Project: Flink
  Issue Type: New Feature
  Components: State Backends, Checkpointing
Affects Versions: 1.2.0
Reporter: Stefan Richter


Currently, the {{CheckpointCoordinator}} only supports repartitioning of 
{{OperatorStateHandle}}s based on a split-and-distribute strategy. For future 
state types, such as broadcast or union state, we need a different 
repartitioning method that allows for replicating state handles to all subtasks.

This is the first step on the way to implementing broadcast and union states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723144#comment-15723144
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90938279
  
--- Diff: 
flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/ProjectionTest.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ProjectionTest {
--- End diff --

Is this test class necessary?


> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723151#comment-15723151
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90923574
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteEdge.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+
+/**
+ *
+ * A BipartiteEdge represents a link between a top and bottom vertices
+ * in a {@link BipartiteGraph}. It is similar to the {@link Edge} class
+ * with the only difference that key of connected vertices can have
+ * different types.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the edge value type
+ */
+public class BipartiteEdge extends Tuple3 {
+
+   private static final long serialVersionUID = 1L;
+
+   public BipartiteEdge() {}
+
+   public BipartiteEdge(KT topId, KB bottomId, EV value) {
+   this.f0 = topId;
+   this.f1 = bottomId;
+   this.f2 = value;
+   }
+
+   public KT getTopId() {
+   return this.f0;
+   }
+
+   public void setTopId(KT i) {
--- End diff --

Parameter name "i" -> "topId"? Also, below for "i" -> "bottomId" and 
"newValue" -> "value"?


> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723153#comment-15723153
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90926642
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
+ * Edges can only exist between a pair of vertices from different vertices 
sets. E.g. there can be no vertices between
+ * a pair of top vertices.
+ *
+ * Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#projectionTopSimple()} or
+ * {@link BipartiteGraph#projectionBottomFull()} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+

[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723160#comment-15723160
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90921530
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteEdge.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+
+/**
+ *
--- End diff --

Empty line.


> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723143#comment-15723143
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90926409
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
+ * Edges can only exist between a pair of vertices from different vertices 
sets. E.g. there can be no vertices between
+ * a pair of top vertices.
+ *
+ * Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#projectionTopSimple()} or
+ * {@link BipartiteGraph#projectionBottomFull()} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
--- End diff --

Extra space.


> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723155#comment-15723155
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90937740
  
--- Diff: 
flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/BipartiteEdgeTest.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class BipartiteEdgeTest {
+
+   private static final int BOTTOM_ID = 0;
+   private static final int TOP_ID = 1;
+   private static final String VALUE = "value";
+
+   private final BipartiteEdge edge = 
createEdge();
+
+   @Test
+   public void testGetBottomId() {
+   assertEquals(Integer.valueOf(BOTTOM_ID), edge.getBottomId());
+   }
+
+   @Test
+   public void testGetTopId() {
+   assertEquals(Integer.valueOf(TOP_ID), edge.getTopId());
+   }
+
+   @Test
+   public void testGetValue() {
+   assertEquals(VALUE, edge.getValue());
+   }
+
+   @Test
+   public void testSetBottomId() {
+   edge.setBottomId(100);
+   assertEquals(Integer.valueOf(100), edge.getBottomId());
--- End diff --

Does auto-boxing not work here?


> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723163#comment-15723163
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90935587
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
+ * Edges can only exist between a pair of vertices from different vertices 
sets. E.g. there can be no vertices between
+ * a pair of top vertices.
+ *
+ * Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#projectionTopSimple()} or
+ * {@link BipartiteGraph#projectionBottomFull()} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+

[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723161#comment-15723161
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90937167
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
+ * Edges can only exist between a pair of vertices from different vertices 
sets. E.g. there can be no vertices between
+ * a pair of top vertices.
+ *
+ * Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#projectionTopSimple()} or
+ * {@link BipartiteGraph#projectionBottomFull()} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+

[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723149#comment-15723149
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90921785
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteEdge.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+
+/**
+ *
+ * A BipartiteEdge represents a link between a top and bottom vertices
--- End diff --

"between a top" -> "between top", or similar.


> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723159#comment-15723159
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90930301
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
+ * Edges can only exist between a pair of vertices from different vertices 
sets. E.g. there can be no vertices between
+ * a pair of top vertices.
+ *
+ * Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#projectionTopSimple()} or
+ * {@link BipartiteGraph#projectionBottomFull()} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+

[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723152#comment-15723152
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90933026
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
+ * Edges can only exist between a pair of vertices from different vertices 
sets. E.g. there can be no vertices between
+ * a pair of top vertices.
+ *
+ * Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#projectionTopSimple()} or
+ * {@link BipartiteGraph#projectionBottomFull()} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+

[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723154#comment-15723154
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90929487
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
+ * Edges can only exist between a pair of vertices from different vertices 
sets. E.g. there can be no vertices between
+ * a pair of top vertices.
+ *
+ * Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#projectionTopSimple()} or
+ * {@link BipartiteGraph#projectionBottomFull()} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+

[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723156#comment-15723156
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90937555
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Projection.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.java.tuple.Tuple6;
+
+/**
+ * Result of projection of a connection between two vertices in a 
bipartite graph.
+ *
+ * @param  the key type of vertices of an opposite set
+ * @param  the value type of vertices of an opposite set
+ * @param  the edge value type
+ */
+public class Projection extends Tuple6 {
--- End diff --

Missing comment for documenting `VVC`. Should `EV` be placed before `VVC`? 
And before `VK` and `VV`?


> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723157#comment-15723157
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90924411
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
+ * Edges can only exist between a pair of vertices from different vertices 
sets. E.g. there can be no vertices between
+ * a pair of top vertices.
+ *
+ * Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#projectionTopSimple()} or
--- End diff --

Are there not four methods? Can we simply reference these, i.e. "This can 
be achieved using the projection methods."?


> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723146#comment-15723146
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90924109
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
+ * Edges can only exist between a pair of vertices from different vertices 
sets. E.g. there can be no vertices between
+ * a pair of top vertices.
+ *
+ * Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#projectionTopSimple()} or
+ * {@link BipartiteGraph#projectionBottomFull()} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
--- End diff --

For VVT (and similar for VVB), how about "the vertex value type for top 
vertices"?


> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723145#comment-15723145
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90931164
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
+ * Edges can only exist between a pair of vertices from different vertices 
sets. E.g. there can be no vertices between
+ * a pair of top vertices.
+ *
+ * Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#projectionTopSimple()} or
+ * {@link BipartiteGraph#projectionBottomFull()} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-12-05 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90929487
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
+ * Edges can only exist between a pair of vertices from different vertices 
sets. E.g. there can be no vertices between
+ * a pair of top vertices.
+ *
+ * Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#projectionTopSimple()} or
+ * {@link BipartiteGraph#projectionBottomFull()} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+   return topVertices;
+   }
+
+   /**
+* Get dataset with bottom vertices.
+*
+* @return dataset with bottom vertices
+*/
+   public DataSet> getBottomVertices() {
+ 

[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723158#comment-15723158
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90923609
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
--- End diff --

Empty line.


> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-12-05 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90924411
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
+ * Edges can only exist between a pair of vertices from different vertices 
sets. E.g. there can be no vertices between
+ * a pair of top vertices.
+ *
+ * Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#projectionTopSimple()} or
--- End diff --

Are there not four methods? Can we simply reference these, i.e. "This can 
be achieved using the projection methods."?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-12-05 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90924109
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
+ * Edges can only exist between a pair of vertices from different vertices 
sets. E.g. there can be no vertices between
+ * a pair of top vertices.
+ *
+ * Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#projectionTopSimple()} or
+ * {@link BipartiteGraph#projectionBottomFull()} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
--- End diff --

For VVT (and similar for VVB), how about "the vertex value type for top 
vertices"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-12-05 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90923574
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteEdge.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+
+/**
+ *
+ * A BipartiteEdge represents a link between a top and bottom vertices
+ * in a {@link BipartiteGraph}. It is similar to the {@link Edge} class
+ * with the only difference that key of connected vertices can have
+ * different types.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the edge value type
+ */
+public class BipartiteEdge extends Tuple3 {
+
+   private static final long serialVersionUID = 1L;
+
+   public BipartiteEdge() {}
+
+   public BipartiteEdge(KT topId, KB bottomId, EV value) {
+   this.f0 = topId;
+   this.f1 = bottomId;
+   this.f2 = value;
+   }
+
+   public KT getTopId() {
+   return this.f0;
+   }
+
+   public void setTopId(KT i) {
--- End diff --

Parameter name "i" -> "topId"? Also, below for "i" -> "bottomId" and 
"newValue" -> "value"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723162#comment-15723162
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90938901
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
 ---
@@ -480,7 +480,8 @@ protected static File asFile(String path) {
}
}

-   assertEquals("Wrong number of elements result", 
expectedStrings.length, resultStrings.length);
+   assertEquals(String.format("Wrong number of elements result. 
Expected: %s. Result: %s.", Arrays.toString(expectedStrings), 
Arrays.toString(resultStrings)),
--- End diff --

The array contents are compared in the assertions that follow the test for 
length.


> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-12-05 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90937167
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
+ * Edges can only exist between a pair of vertices from different vertices 
sets. E.g. there can be no vertices between
+ * a pair of top vertices.
+ *
+ * Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#projectionTopSimple()} or
+ * {@link BipartiteGraph#projectionBottomFull()} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+   return topVertices;
+   }
+
+   /**
+* Get dataset with bottom vertices.
+*
+* @return dataset with bottom vertices
+*/
+   public DataSet> getBottomVertices() {
+ 

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-12-05 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90921530
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteEdge.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+
+/**
+ *
--- End diff --

Empty line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-12-05 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90926220
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
--- End diff --

What about "The vertices of a bipartite graph are divided into two disjoint 
sets, referenced by the names "top" and "bottom". Top and bottom vertices with 
the same key value represent distinct entities and must be specially handled 
when projecting to a simple {@link Graph}."?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-12-05 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90938901
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
 ---
@@ -480,7 +480,8 @@ protected static File asFile(String path) {
}
}

-   assertEquals("Wrong number of elements result", 
expectedStrings.length, resultStrings.length);
+   assertEquals(String.format("Wrong number of elements result. 
Expected: %s. Result: %s.", Arrays.toString(expectedStrings), 
Arrays.toString(resultStrings)),
--- End diff --

The array contents are compared in the assertions that follow the test for 
length.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-12-05 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90937740
  
--- Diff: 
flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/BipartiteEdgeTest.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class BipartiteEdgeTest {
+
+   private static final int BOTTOM_ID = 0;
+   private static final int TOP_ID = 1;
+   private static final String VALUE = "value";
+
+   private final BipartiteEdge edge = 
createEdge();
+
+   @Test
+   public void testGetBottomId() {
+   assertEquals(Integer.valueOf(BOTTOM_ID), edge.getBottomId());
+   }
+
+   @Test
+   public void testGetTopId() {
+   assertEquals(Integer.valueOf(TOP_ID), edge.getTopId());
+   }
+
+   @Test
+   public void testGetValue() {
+   assertEquals(VALUE, edge.getValue());
+   }
+
+   @Test
+   public void testSetBottomId() {
+   edge.setBottomId(100);
+   assertEquals(Integer.valueOf(100), edge.getBottomId());
--- End diff --

Does auto-boxing not work here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723147#comment-15723147
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90926220
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
--- End diff --

What about "The vertices of a bipartite graph are divided into two disjoint 
sets, referenced by the names "top" and "bottom". Top and bottom vertices with 
the same key value represent distinct entities and must be specially handled 
when projecting to a simple {@link Graph}."?


> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723150#comment-15723150
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90929479
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
+ * Edges can only exist between a pair of vertices from different vertices 
sets. E.g. there can be no vertices between
+ * a pair of top vertices.
+ *
+ * Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#projectionTopSimple()} or
+ * {@link BipartiteGraph#projectionBottomFull()} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-12-05 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90926409
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
+ * Edges can only exist between a pair of vertices from different vertices 
sets. E.g. there can be no vertices between
+ * a pair of top vertices.
+ *
+ * Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#projectionTopSimple()} or
+ * {@link BipartiteGraph#projectionBottomFull()} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
--- End diff --

Extra space.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723148#comment-15723148
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90922934
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteEdge.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+
+/**
+ *
+ * A BipartiteEdge represents a link between a top and bottom vertices
+ * in a {@link BipartiteGraph}. It is similar to the {@link Edge} class
--- End diff --

"It is generalized form of {@link Edge} where the source and target vertex 
IDs can be of different types.", or similar?


> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-12-05 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90921785
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteEdge.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+
+/**
+ *
+ * A BipartiteEdge represents a link between a top and bottom vertices
--- End diff --

"between a top" -> "between top", or similar.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-12-05 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90937555
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Projection.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.java.tuple.Tuple6;
+
+/**
+ * Result of projection of a connection between two vertices in a 
bipartite graph.
+ *
+ * @param  the key type of vertices of an opposite set
+ * @param  the value type of vertices of an opposite set
+ * @param  the edge value type
+ */
+public class Projection extends Tuple6 {
--- End diff --

Missing comment for documenting `VVC`. Should `EV` be placed before `VVC`? 
And before `VK` and `VV`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-12-05 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90923609
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
--- End diff --

Empty line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-12-05 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90931164
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
+ * Edges can only exist between a pair of vertices from different vertices 
sets. E.g. there can be no vertices between
+ * a pair of top vertices.
+ *
+ * Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#projectionTopSimple()} or
+ * {@link BipartiteGraph#projectionBottomFull()} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+   return topVertices;
+   }
+
+   /**
+* Get dataset with bottom vertices.
+*
+* @return dataset with bottom vertices
+*/
+   public DataSet> getBottomVertices() {
+ 

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-12-05 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90930301
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
+ * Edges can only exist between a pair of vertices from different vertices 
sets. E.g. there can be no vertices between
+ * a pair of top vertices.
+ *
+ * Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#projectionTopSimple()} or
+ * {@link BipartiteGraph#projectionBottomFull()} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+   return topVertices;
+   }
+
+   /**
+* Get dataset with bottom vertices.
+*
+* @return dataset with bottom vertices
+*/
+   public DataSet> getBottomVertices() {
+ 

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-12-05 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90933026
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
+ * Edges can only exist between a pair of vertices from different vertices 
sets. E.g. there can be no vertices between
+ * a pair of top vertices.
+ *
+ * Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#projectionTopSimple()} or
+ * {@link BipartiteGraph#projectionBottomFull()} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+   return topVertices;
+   }
+
+   /**
+* Get dataset with bottom vertices.
+*
+* @return dataset with bottom vertices
+*/
+   public DataSet> getBottomVertices() {
+ 

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-12-05 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90935587
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
+ * Edges can only exist between a pair of vertices from different vertices 
sets. E.g. there can be no vertices between
+ * a pair of top vertices.
+ *
+ * Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#projectionTopSimple()} or
+ * {@link BipartiteGraph#projectionBottomFull()} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+   return topVertices;
+   }
+
+   /**
+* Get dataset with bottom vertices.
+*
+* @return dataset with bottom vertices
+*/
+   public DataSet> getBottomVertices() {
+ 

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-12-05 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90926642
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
+ * Edges can only exist between a pair of vertices from different vertices 
sets. E.g. there can be no vertices between
+ * a pair of top vertices.
+ *
+ * Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#projectionTopSimple()} or
+ * {@link BipartiteGraph#projectionBottomFull()} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+   return topVertices;
+   }
+
+   /**
+* Get dataset with bottom vertices.
+*
+* @return dataset with bottom vertices
+*/
+   public DataSet> getBottomVertices() {
+ 

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-12-05 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90929479
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
 ---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ *
+ * Bipartite graph is a graph whose vertices can be divided into two 
disjoint sets: top vertices and bottom vertices.
+ * Edges can only exist between a pair of vertices from different vertices 
sets. E.g. there can be no vertices between
+ * a pair of top vertices.
+ *
+ * Bipartite graphs are useful to represent graphs with two sets of 
objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by 
a particular author.
+ *
+ * Bipartite interface is different from {@link Graph} interface, so to 
apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} 
instance. This can be achieved by using
+ * {@link BipartiteGraph#projectionTopSimple()} or
+ * {@link BipartiteGraph#projectionBottomFull()} methods.
+ *
+ * @param  the key type of the top vertices
+ * @param  the key type of the bottom vertices
+ * @param  the top vertices value type
+ * @param  the bottom vertices value type
+ * @param  the edge value type
+ */
+public class BipartiteGraph {
+   private final ExecutionEnvironment context;
+   private final DataSet> topVertices;
+   private final DataSet> bottomVertices;
+   private final DataSet> edges;
+
+   private BipartiteGraph(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   this.topVertices = topVertices;
+   this.bottomVertices = bottomVertices;
+   this.edges = edges;
+   this.context = context;
+   }
+
+   /**
+* Create bipartite graph from datasets.
+*
+* @param topVertices  dataset of top vertices in the graph
+* @param bottomVertices dataset of bottom vertices in the graph
+* @param edges dataset of edges between vertices
+* @param context Flink execution context
+* @param  the key type of the top vertices
+* @param  the key type of the bottom vertices
+* @param  the top vertices value type
+* @param  the bottom vertices value type
+* @param  the edge value type
+* @return new bipartite graph created from provided datasets
+*/
+   public static  BipartiteGraph 
fromDataSet(
+   DataSet> topVertices,
+   DataSet> bottomVertices,
+   DataSet> edges,
+   ExecutionEnvironment context) {
+   return new BipartiteGraph<>(topVertices, bottomVertices, edges, 
context);
+   }
+
+   /**
+* Get dataset with top vertices.
+*
+* @return dataset with top vertices
+*/
+   public DataSet> getTopVertices() {
+   return topVertices;
+   }
+
+   /**
+* Get dataset with bottom vertices.
+*
+* @return dataset with bottom vertices
+*/
+   public DataSet> getBottomVertices() {
+ 

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-12-05 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90922934
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteEdge.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+
+/**
+ *
+ * A BipartiteEdge represents a link between a top and bottom vertices
+ * in a {@link BipartiteGraph}. It is similar to the {@link Edge} class
--- End diff --

"It is generalized form of {@link Edge} where the source and target vertex 
IDs can be of different types.", or similar?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

2016-12-05 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2564#discussion_r90938279
  
--- Diff: 
flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/ProjectionTest.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ProjectionTest {
--- End diff --

Is this test class necessary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723129#comment-15723129
 ] 

ASF GitHub Bot commented on FLINK-3848:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2810
  
Hi @tonycox, the plan choice issues might be related to FLINK-5226. I have 
a PR pending to fix this issue: #2926. You could try to built your PR on top of 
my PR (or for the sake of simplicity simply copy the changes).

Please let me know if that solves the problem. If not, I'll be happy to 
check our your branch to investigate what is going wrong.


> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2810: [FLINK-3848] Add ProjectableTableSource interface and tra...

2016-12-05 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2810
  
Hi @tonycox, the plan choice issues might be related to FLINK-5226. I have 
a PR pending to fix this issue: #2926. You could try to built your PR on top of 
my PR (or for the sake of simplicity simply copy the changes).

Please let me know if that solves the problem. If not, I'll be happy to 
check our your branch to investigate what is going wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2913: [backport] [FLINK-5114] [network] Handle partition produc...

2016-12-05 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/2913
  
> Why is that necessary? Can we not just assume that if the attempt is not 
equal to the current execution attempt, then the status is some form of 
"disposed".

It's not necessary. It's perfectly fine to do it as you describe. Not 
having the `currentExecution` set to the producer execution means that the 
producer was restarted (hence cancelled or failed). This only made the handling 
in `Task` easier, but it should not dictate this change in the 
`ExecutionVertex`. I'll change that to only check the `currentExecution` and 
handle it accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5114) PartitionState update with finished execution fails

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723031#comment-15723031
 ] 

ASF GitHub Bot commented on FLINK-5114:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2913
  
> Why is that necessary? Can we not just assume that if the attempt is not 
equal to the current execution attempt, then the status is some form of 
"disposed".

It's not necessary. It's perfectly fine to do it as you describe. Not 
having the `currentExecution` set to the producer execution means that the 
producer was restarted (hence cancelled or failed). This only made the handling 
in `Task` easier, but it should not dictate this change in the 
`ExecutionVertex`. I'll change that to only check the `currentExecution` and 
handle it accordingly.


> PartitionState update with finished execution fails
> ---
>
> Key: FLINK-5114
> URL: https://issues.apache.org/jira/browse/FLINK-5114
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> If a partition state request is triggered for a producer that finishes before 
> the request arrives, the execution is unregistered and the producer cannot be 
> found. In this case the PartitionState returns null and the job fails.
> We need to check the producer location via the intermediate result partition 
> in this case.
> See here: https://api.travis-ci.org/jobs/177668505/log.txt?deansi=true



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5091) Formalize the AppMaster environment for docker compability

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723027#comment-15723027
 ] 

ASF GitHub Bot commented on FLINK-5091:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2915
  
Looks really good!. Merging with some minor changes.


> Formalize the AppMaster environment for docker compability
> --
>
> Key: FLINK-5091
> URL: https://issues.apache.org/jira/browse/FLINK-5091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
> Fix For: 1.2.0
>
>
> For scenarios where the AppMaster is launched from a docker image, it would 
> be ideal to use the installed Flink rather than rely on a special file layout 
> in the sandbox directory.
> This is related to DCOS integration, which (in 1.2) will launch the AppMaster 
> via Marathon (as a top-level DCOS service).  The existing code assumed that 
> only the dispatcher (coming in 1.3) would launch the AppMaster.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2915: [FLINK-5091] Formalize the Mesos AppMaster environment fo...

2016-12-05 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2915
  
Looks really good!. Merging with some minor changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15722920#comment-15722920
 ] 

ASF GitHub Bot commented on FLINK-3848:
---

Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2810
  
I see. @fhueske. Set an order of scaning is much better way. 
I have problem when plan is choosing. Best expression is not correct. Rules 
work, but planner decides to choose `MapFunction` anyway, without projection


> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2810: [FLINK-3848] Add ProjectableTableSource interface and tra...

2016-12-05 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2810
  
I see. @fhueske. Set an order of scaning is much better way. 
I have problem when plan is choosing. Best expression is not correct. Rules 
work, but planner decides to choose `MapFunction` anyway, without projection


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5114) PartitionState update with finished execution fails

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15722887#comment-15722887
 ] 

ASF GitHub Bot commented on FLINK-5114:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2913
  
Having a quick look at this: I think this breaks with a fundamental design 
in the ExecutionGraph:
The `findExecutionAttemptWithId(...)` method searches the prior execution 
attempts.

Why is that necessary? Can we not just assume that if the attempt is not 
equal to the current execution attempt, then the status is some form of 
"disposed".

If the produced result is finished, the execution will still not be in the 
"prior execution attempts". That can only happen once the task restarts, in 
which case you should not try and fetch the partition any more.


> PartitionState update with finished execution fails
> ---
>
> Key: FLINK-5114
> URL: https://issues.apache.org/jira/browse/FLINK-5114
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> If a partition state request is triggered for a producer that finishes before 
> the request arrives, the execution is unregistered and the producer cannot be 
> found. In this case the PartitionState returns null and the job fails.
> We need to check the producer location via the intermediate result partition 
> in this case.
> See here: https://api.travis-ci.org/jobs/177668505/log.txt?deansi=true



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2913: [backport] [FLINK-5114] [network] Handle partition produc...

2016-12-05 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2913
  
Having a quick look at this: I think this breaks with a fundamental design 
in the ExecutionGraph:
The `findExecutionAttemptWithId(...)` method searches the prior execution 
attempts.

Why is that necessary? Can we not just assume that if the attempt is not 
equal to the current execution attempt, then the status is some form of 
"disposed".

If the produced result is finished, the execution will still not be in the 
"prior execution attempts". That can only happen once the task restarts, in 
which case you should not try and fetch the partition any more.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5258) reorganize the docs to improve navigation and reduce duplication

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15722871#comment-15722871
 ] 

ASF GitHub Bot commented on FLINK-5258:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2940
  
Thanks a lot for this pull request!
I think this looks very good - much better than the current state.

Two changes I would make:
  - Drop "Projects and Dependencies", it is so outdated that it is 
misleading these days
  - The same holds for "How to add a new DataSet Operator"

Otherwise, I would merge this, as it is a big improvement over the current 
version.


> reorganize the docs to improve navigation and reduce duplication
> 
>
> Key: FLINK-5258
> URL: https://issues.apache.org/jira/browse/FLINK-5258
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: David Anderson
>
> Goals:
> * reduce duplication: ideally every piece of information would have one 
> natural place to be
> * improve navigation and discoverability: important topics shouldn't be hard 
> to find
> * arrange the content in a natural, linear ordering for those who want to 
> read (or skim through) everything



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2940: [FLINK-5258] [docs] reorganize the docs to improve naviga...

2016-12-05 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2940
  
Thanks a lot for this pull request!
I think this looks very good - much better than the current state.

Two changes I would make:
  - Drop "Projects and Dependencies", it is so outdated that it is 
misleading these days
  - The same holds for "How to add a new DataSet Operator"

Otherwise, I would merge this, as it is a big improvement over the current 
version.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3921) StringParser not specifying encoding to use

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15722864#comment-15722864
 ] 

ASF GitHub Bot commented on FLINK-3921:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2901
  
@fhueske, please review the additional commits.


> StringParser not specifying encoding to use
> ---
>
> Key: FLINK-3921
> URL: https://issues.apache.org/jira/browse/FLINK-3921
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.3
>Reporter: Tatu Saloranta
>Assignee: Rekha Joshi
>Priority: Trivial
>
> Class `flink.types.parser.StringParser` has javadocs indicating that contents 
> are expected to be Ascii, similar to `StringValueParser`. That makes sense, 
> but when constructing actual instance, no encoding is specified; on line 66 
> f.ex:
>this.result = new String(bytes, startPos+1, i - startPos - 2);
> which leads to using whatever default platform encoding is. If contents 
> really are always Ascii (would not count on that as parser is used from CSV 
> reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues.
> So I think that encoding should be explicitly specified, whatever is to be 
> used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 
> or even ISO-8859-1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2901: [FLINK-3921] StringParser encoding

2016-12-05 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2901
  
@fhueske, please review the additional commits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5122) Elasticsearch Sink loses documents when cluster has high load

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15722849#comment-15722849
 ] 

ASF GitHub Bot commented on FLINK-5122:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r90917049
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest 
request) {
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, BulkResponse response) {
+   boolean allRequestsRepeatable = true;
if (response.hasFailures()) {
for (BulkItemResponse itemResp : 
response.getItems()) {
if (itemResp.isFailed()) {
-   LOG.error("Failed to 
index document in Elasticsearch: " + itemResp.getFailureMessage());
-   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
+   // Check if index 
request can be retried
+   String 
failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+   if 
(failureMessageLowercase.contains("timeout") || 
failureMessageLowercase.contains("timed out") // Generic timeout errors
+   || 
failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // 
Shard not available due to rebalancing or node down
+   || 
(failureMessageLowercase.contains("data/write/bulk") && 
failureMessageLowercase.contains("bulk")) // Bulk index queue on node full 
+   ) {
+   
LOG.debug("Retry batch: " + itemResp.getFailureMessage());
+   
reAddBulkRequest(request);
+   } else { // Cannot 
retry action
+   
allRequestsRepeatable = false;
+   
LOG.error("Failed to index document in Elasticsearch: " + 
itemResp.getFailureMessage());
+   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));   
+   }
}
}
-   hasFailure.set(true);
+   if (!allRequestsRepeatable) {
+   hasFailure.set(true);
+   }
}
}
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, Throwable failure) {
-   LOG.error(failure.getMessage());
-   failureThrowable.compareAndSet(null, failure);
-   hasFailure.set(true);
+   if (failure instanceof ClusterBlockException // 
Examples: "no master"
+   || failure instanceof 
ElasticsearchTimeoutException // ElasticsearchTimeoutException sounded good, 
not seen in stress tests yet
+   ) 
+   {
+   LOG.debug("Retry batch on throwable: " 
+ failure.getMessage());
+   reAddBulkRequest(request);
+   } else { 
+   LOG.error("Failed to index bulk in 
Elasticsearch. " + failure.getMessage());
--- End diff --

String concat


> Elasticsearch Sink loses documents when cluster has high load
> -
>
> Key: FLINK-5122
> URL: https://issues.apache.org/jira/browse/FLINK-5122
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.2.0
>Reporter: static-max
>Assignee: static-max
>
> My cluster had high 

[jira] [Commented] (FLINK-5122) Elasticsearch Sink loses documents when cluster has high load

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15722845#comment-15722845
 ] 

ASF GitHub Bot commented on FLINK-5122:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r90916759
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest 
request) {
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, BulkResponse response) {
+   boolean allRequestsRepeatable = true;
if (response.hasFailures()) {
for (BulkItemResponse itemResp : 
response.getItems()) {
if (itemResp.isFailed()) {
-   LOG.error("Failed to 
index document in Elasticsearch: " + itemResp.getFailureMessage());
-   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
+   // Check if index 
request can be retried
+   String 
failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+   if 
(failureMessageLowercase.contains("timeout") || 
failureMessageLowercase.contains("timed out") // Generic timeout errors
--- End diff --

This string-based error matching seems to be a pretty unstable mechanism.
Can you add a flag to control whether the mechanism is enabled, and disable 
it by default (but document it on the ES connector page)


> Elasticsearch Sink loses documents when cluster has high load
> -
>
> Key: FLINK-5122
> URL: https://issues.apache.org/jira/browse/FLINK-5122
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.2.0
>Reporter: static-max
>Assignee: static-max
>
> My cluster had high load and documents got not indexed. This violates the "at 
> least once" semantics in the ES connector.
> I gave pressure on my cluster to test Flink, causing new indices to be 
> created and balanced. On those errors the bulk should be tried again instead 
> of being discarded.
> Primary shard not active because ES decided to rebalance the index:
> 2016-11-15 15:35:16,123 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
> Failed to index document in Elasticsearch: 
> UnavailableShardsException[[index-name][3] primary shard is not active 
> Timeout: [1m], request: [BulkShardRequest to [index-name] containing [20] 
> requests]]
> Bulk queue on node full (I set queue to a low value to reproduce error):
> 22:37:57,702 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
> Failed to index document in Elasticsearch: 
> RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]];
>  nested: EsRejectedExecutionException[rejected execution of 
> org.elasticsearch.transport.TransportService$4@727e677c on 
> EsThreadPoolExecutor[bulk, queue capacity = 1, 
> org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running,
>  pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 
> 2939]]];
> I can try to propose a PR for this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5122) Elasticsearch Sink loses documents when cluster has high load

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15722847#comment-15722847
 ] 

ASF GitHub Bot commented on FLINK-5122:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r90917039
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest 
request) {
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, BulkResponse response) {
+   boolean allRequestsRepeatable = true;
if (response.hasFailures()) {
for (BulkItemResponse itemResp : 
response.getItems()) {
if (itemResp.isFailed()) {
-   LOG.error("Failed to 
index document in Elasticsearch: " + itemResp.getFailureMessage());
-   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
+   // Check if index 
request can be retried
+   String 
failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+   if 
(failureMessageLowercase.contains("timeout") || 
failureMessageLowercase.contains("timed out") // Generic timeout errors
+   || 
failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // 
Shard not available due to rebalancing or node down
+   || 
(failureMessageLowercase.contains("data/write/bulk") && 
failureMessageLowercase.contains("bulk")) // Bulk index queue on node full 
+   ) {
+   
LOG.debug("Retry batch: " + itemResp.getFailureMessage());
+   
reAddBulkRequest(request);
+   } else { // Cannot 
retry action
+   
allRequestsRepeatable = false;
+   
LOG.error("Failed to index document in Elasticsearch: " + 
itemResp.getFailureMessage());
+   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));   
+   }
}
}
-   hasFailure.set(true);
+   if (!allRequestsRepeatable) {
+   hasFailure.set(true);
+   }
}
}
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, Throwable failure) {
-   LOG.error(failure.getMessage());
-   failureThrowable.compareAndSet(null, failure);
-   hasFailure.set(true);
+   if (failure instanceof ClusterBlockException // 
Examples: "no master"
+   || failure instanceof 
ElasticsearchTimeoutException // ElasticsearchTimeoutException sounded good, 
not seen in stress tests yet
+   ) 
+   {
+   LOG.debug("Retry batch on throwable: " 
+ failure.getMessage());
--- End diff --

String concat


> Elasticsearch Sink loses documents when cluster has high load
> -
>
> Key: FLINK-5122
> URL: https://issues.apache.org/jira/browse/FLINK-5122
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.2.0
>Reporter: static-max
>Assignee: static-max
>
> My cluster had high load and documents got not indexed. This violates the "at 
> least once" semantics in the ES connector.
> I gave pressure on my cluster to test Flink, causing new indices to be 
> created and balanced. On those errors the bulk 

[jira] [Commented] (FLINK-5122) Elasticsearch Sink loses documents when cluster has high load

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15722850#comment-15722850
 ] 

ASF GitHub Bot commented on FLINK-5122:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r90917303
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -227,6 +254,21 @@ public void afterBulk(long executionId, BulkRequest 
request, Throwable failure)
requestIndexer = new BulkProcessorIndexer(bulkProcessor);
}
 
+   /**
+* Adds all requests of the bulk to the BulkProcessor. Used when trying 
again.
+* @param bulkRequest
+*/
+   public void reAddBulkRequest(BulkRequest bulkRequest) {
+   //TODO Check what happens when bulk contains a DeleteAction and 
IndexActions and the DeleteAction fails because the document already has been 
deleted. This may not happen in typical Flink jobs.
--- End diff --

So what about this TODO? Can we somehow filter these requests?


> Elasticsearch Sink loses documents when cluster has high load
> -
>
> Key: FLINK-5122
> URL: https://issues.apache.org/jira/browse/FLINK-5122
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.2.0
>Reporter: static-max
>Assignee: static-max
>
> My cluster had high load and documents got not indexed. This violates the "at 
> least once" semantics in the ES connector.
> I gave pressure on my cluster to test Flink, causing new indices to be 
> created and balanced. On those errors the bulk should be tried again instead 
> of being discarded.
> Primary shard not active because ES decided to rebalance the index:
> 2016-11-15 15:35:16,123 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
> Failed to index document in Elasticsearch: 
> UnavailableShardsException[[index-name][3] primary shard is not active 
> Timeout: [1m], request: [BulkShardRequest to [index-name] containing [20] 
> requests]]
> Bulk queue on node full (I set queue to a low value to reproduce error):
> 22:37:57,702 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
> Failed to index document in Elasticsearch: 
> RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]];
>  nested: EsRejectedExecutionException[rejected execution of 
> org.elasticsearch.transport.TransportService$4@727e677c on 
> EsThreadPoolExecutor[bulk, queue capacity = 1, 
> org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running,
>  pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 
> 2939]]];
> I can try to propose a PR for this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5122) Elasticsearch Sink loses documents when cluster has high load

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15722848#comment-15722848
 ] 

ASF GitHub Bot commented on FLINK-5122:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r90916979
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest 
request) {
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, BulkResponse response) {
+   boolean allRequestsRepeatable = true;
if (response.hasFailures()) {
for (BulkItemResponse itemResp : 
response.getItems()) {
if (itemResp.isFailed()) {
-   LOG.error("Failed to 
index document in Elasticsearch: " + itemResp.getFailureMessage());
-   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
+   // Check if index 
request can be retried
+   String 
failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+   if 
(failureMessageLowercase.contains("timeout") || 
failureMessageLowercase.contains("timed out") // Generic timeout errors
+   || 
failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // 
Shard not available due to rebalancing or node down
+   || 
(failureMessageLowercase.contains("data/write/bulk") && 
failureMessageLowercase.contains("bulk")) // Bulk index queue on node full 
+   ) {
+   
LOG.debug("Retry batch: " + itemResp.getFailureMessage());
+   
reAddBulkRequest(request);
+   } else { // Cannot 
retry action
+   
allRequestsRepeatable = false;
+   
LOG.error("Failed to index document in Elasticsearch: " + 
itemResp.getFailureMessage());
--- End diff --

{} instead of string concat.


> Elasticsearch Sink loses documents when cluster has high load
> -
>
> Key: FLINK-5122
> URL: https://issues.apache.org/jira/browse/FLINK-5122
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.2.0
>Reporter: static-max
>Assignee: static-max
>
> My cluster had high load and documents got not indexed. This violates the "at 
> least once" semantics in the ES connector.
> I gave pressure on my cluster to test Flink, causing new indices to be 
> created and balanced. On those errors the bulk should be tried again instead 
> of being discarded.
> Primary shard not active because ES decided to rebalance the index:
> 2016-11-15 15:35:16,123 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
> Failed to index document in Elasticsearch: 
> UnavailableShardsException[[index-name][3] primary shard is not active 
> Timeout: [1m], request: [BulkShardRequest to [index-name] containing [20] 
> requests]]
> Bulk queue on node full (I set queue to a low value to reproduce error):
> 22:37:57,702 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
> Failed to index document in Elasticsearch: 
> RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]];
>  nested: EsRejectedExecutionException[rejected execution of 
> org.elasticsearch.transport.TransportService$4@727e677c on 
> EsThreadPoolExecutor[bulk, queue capacity = 1, 
> org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running,
>  pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 
> 2939]]];
> I can try to propose a PR for this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5122) Elasticsearch Sink loses documents when cluster has high load

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15722846#comment-15722846
 ] 

ASF GitHub Bot commented on FLINK-5122:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r90916177
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest 
request) {
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, BulkResponse response) {
+   boolean allRequestsRepeatable = true;
if (response.hasFailures()) {
for (BulkItemResponse itemResp : 
response.getItems()) {
if (itemResp.isFailed()) {
-   LOG.error("Failed to 
index document in Elasticsearch: " + itemResp.getFailureMessage());
-   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
+   // Check if index 
request can be retried
+   String 
failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+   if 
(failureMessageLowercase.contains("timeout") || 
failureMessageLowercase.contains("timed out") // Generic timeout errors
+   || 
failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // 
Shard not available due to rebalancing or node down
+   || 
(failureMessageLowercase.contains("data/write/bulk") && 
failureMessageLowercase.contains("bulk")) // Bulk index queue on node full 
+   ) {
+   
LOG.debug("Retry batch: " + itemResp.getFailureMessage());
--- End diff --

I would log here at a higher logging level.
Also, could you not use string concatenation here and use the "Retry batch: 
{}", itemResp.getFailureMessage()); pattern?


> Elasticsearch Sink loses documents when cluster has high load
> -
>
> Key: FLINK-5122
> URL: https://issues.apache.org/jira/browse/FLINK-5122
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.2.0
>Reporter: static-max
>Assignee: static-max
>
> My cluster had high load and documents got not indexed. This violates the "at 
> least once" semantics in the ES connector.
> I gave pressure on my cluster to test Flink, causing new indices to be 
> created and balanced. On those errors the bulk should be tried again instead 
> of being discarded.
> Primary shard not active because ES decided to rebalance the index:
> 2016-11-15 15:35:16,123 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
> Failed to index document in Elasticsearch: 
> UnavailableShardsException[[index-name][3] primary shard is not active 
> Timeout: [1m], request: [BulkShardRequest to [index-name] containing [20] 
> requests]]
> Bulk queue on node full (I set queue to a low value to reproduce error):
> 22:37:57,702 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
> Failed to index document in Elasticsearch: 
> RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]];
>  nested: EsRejectedExecutionException[rejected execution of 
> org.elasticsearch.transport.TransportService$4@727e677c on 
> EsThreadPoolExecutor[bulk, queue capacity = 1, 
> org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running,
>  pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 
> 2939]]];
> I can try to propose a PR for this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2016-12-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r90916177
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest 
request) {
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, BulkResponse response) {
+   boolean allRequestsRepeatable = true;
if (response.hasFailures()) {
for (BulkItemResponse itemResp : 
response.getItems()) {
if (itemResp.isFailed()) {
-   LOG.error("Failed to 
index document in Elasticsearch: " + itemResp.getFailureMessage());
-   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
+   // Check if index 
request can be retried
+   String 
failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+   if 
(failureMessageLowercase.contains("timeout") || 
failureMessageLowercase.contains("timed out") // Generic timeout errors
+   || 
failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // 
Shard not available due to rebalancing or node down
+   || 
(failureMessageLowercase.contains("data/write/bulk") && 
failureMessageLowercase.contains("bulk")) // Bulk index queue on node full 
+   ) {
+   
LOG.debug("Retry batch: " + itemResp.getFailureMessage());
--- End diff --

I would log here at a higher logging level.
Also, could you not use string concatenation here and use the "Retry batch: 
{}", itemResp.getFailureMessage()); pattern?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2016-12-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r90916759
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest 
request) {
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, BulkResponse response) {
+   boolean allRequestsRepeatable = true;
if (response.hasFailures()) {
for (BulkItemResponse itemResp : 
response.getItems()) {
if (itemResp.isFailed()) {
-   LOG.error("Failed to 
index document in Elasticsearch: " + itemResp.getFailureMessage());
-   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
+   // Check if index 
request can be retried
+   String 
failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+   if 
(failureMessageLowercase.contains("timeout") || 
failureMessageLowercase.contains("timed out") // Generic timeout errors
--- End diff --

This string-based error matching seems to be a pretty unstable mechanism.
Can you add a flag to control whether the mechanism is enabled, and disable 
it by default (but document it on the ES connector page)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2016-12-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r90917049
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest 
request) {
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, BulkResponse response) {
+   boolean allRequestsRepeatable = true;
if (response.hasFailures()) {
for (BulkItemResponse itemResp : 
response.getItems()) {
if (itemResp.isFailed()) {
-   LOG.error("Failed to 
index document in Elasticsearch: " + itemResp.getFailureMessage());
-   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
+   // Check if index 
request can be retried
+   String 
failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+   if 
(failureMessageLowercase.contains("timeout") || 
failureMessageLowercase.contains("timed out") // Generic timeout errors
+   || 
failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // 
Shard not available due to rebalancing or node down
+   || 
(failureMessageLowercase.contains("data/write/bulk") && 
failureMessageLowercase.contains("bulk")) // Bulk index queue on node full 
+   ) {
+   
LOG.debug("Retry batch: " + itemResp.getFailureMessage());
+   
reAddBulkRequest(request);
+   } else { // Cannot 
retry action
+   
allRequestsRepeatable = false;
+   
LOG.error("Failed to index document in Elasticsearch: " + 
itemResp.getFailureMessage());
+   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));   
+   }
}
}
-   hasFailure.set(true);
+   if (!allRequestsRepeatable) {
+   hasFailure.set(true);
+   }
}
}
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, Throwable failure) {
-   LOG.error(failure.getMessage());
-   failureThrowable.compareAndSet(null, failure);
-   hasFailure.set(true);
+   if (failure instanceof ClusterBlockException // 
Examples: "no master"
+   || failure instanceof 
ElasticsearchTimeoutException // ElasticsearchTimeoutException sounded good, 
not seen in stress tests yet
+   ) 
+   {
+   LOG.debug("Retry batch on throwable: " 
+ failure.getMessage());
+   reAddBulkRequest(request);
+   } else { 
+   LOG.error("Failed to index bulk in 
Elasticsearch. " + failure.getMessage());
--- End diff --

String concat


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   >