[jira] [Commented] (FLINK-5245) Add support for BipartiteGraph mutations

2016-12-12 Thread Vasia Kalavri (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15744495#comment-15744495
 ] 

Vasia Kalavri commented on FLINK-5245:
--

My point is not that these features are useless for bipartite graphs, but we 
have to think whether re-implementing these features specifically for bipartite 
graphs makes sense, e.g. because general graphs do not supported them or 
because we can use the knowledge that we have a bipartite graph to make 
implementation more efficient. For example, projection is a transformation that 
can only be applied on bipartite graphs. But if all you want to do is get the 
degrees of your bipartite graph, can you use the available Graph methods? Or 
can we provide a better way to get the degrees because we know we have a 
bipartite graph? These are the questions we have to ask for each of these 
features in the list in my opinion.

> Add support for BipartiteGraph mutations
> 
>
> Key: FLINK-5245
> URL: https://issues.apache.org/jira/browse/FLINK-5245
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>
> Implement methods for adding and removing vertices and edges similarly to 
> Graph class.
> Depends on https://issues.apache.org/jira/browse/FLINK-2254



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5324) JVM Opitons will be work both for YarnApplicationMasterRunner and YarnTaskManager with yarn mode

2016-12-12 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-5324:
---
Description: 
YarnApplicationMasterRunner and YarnTaskManager both use follow code to get jvm 
options
{code}
final String javaOpts = 
flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "")
{code}
so when we add some jvm options for one of them ,it will be both worked

  was:
YarnApplicationMasterRunner and YarnTaskManager both use follow code to get jvm 
options
{code}
final String javaOpts = 
flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "")
{code/}
so when we add some jvm options for one of them ,it will be both worked


> JVM Opitons will be work both for YarnApplicationMasterRunner and 
> YarnTaskManager with yarn mode
> 
>
> Key: FLINK-5324
> URL: https://issues.apache.org/jira/browse/FLINK-5324
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.1.3
>Reporter: yuemeng
>Priority: Critical
>
> YarnApplicationMasterRunner and YarnTaskManager both use follow code to get 
> jvm options
> {code}
> final String javaOpts = 
> flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "")
> {code}
> so when we add some jvm options for one of them ,it will be both worked



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5324) JVM Opitons will be work both for YarnApplicationMasterRunner and YarnTaskManager with yarn mode

2016-12-12 Thread yuemeng (JIRA)
yuemeng created FLINK-5324:
--

 Summary: JVM Opitons will be work both for 
YarnApplicationMasterRunner and YarnTaskManager with yarn mode
 Key: FLINK-5324
 URL: https://issues.apache.org/jira/browse/FLINK-5324
 Project: Flink
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.1.3
Reporter: yuemeng
Priority: Critical


YarnApplicationMasterRunner and YarnTaskManager both use follow code to get jvm 
options
{code}
final String javaOpts = 
flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "")
{code/}
so when we add some jvm options for one of them ,it will be both worked



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3475) DISTINCT aggregate function support for SQL queries

2016-12-12 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15744479#comment-15744479
 ] 

Fabian Hueske commented on FLINK-3475:
--

Hi [~ykt836], sure. That's fine with me and would be great!
I don't have time to work on it myself and just wanted to sketch a possible 
design. 
Please let me know if you have questions about it or if you have other 
suggestions.

> DISTINCT aggregate function support for SQL queries
> ---
>
> Key: FLINK-3475
> URL: https://issues.apache.org/jira/browse/FLINK-3475
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Chengxiang Li
>
> DISTINCT aggregate function may be able to reuse the aggregate function 
> instead of separate implementation, and let Flink runtime take care of 
> duplicate records.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5311) Write user documentation for BipartiteGraph

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15744476#comment-15744476
 ] 

ASF GitHub Bot commented on FLINK-5311:
---

Github user vasia commented on the issue:

https://github.com/apache/flink/pull/2984
  
Thank you for the update @mushketyk! I still don't see any link from the 
Gelly guide page to the bipartite docs though. Can you please add that too? 
Otherwise people won't be able to find the docs :)
As for the images, I think it would be nice to have show how a projection 
works.


> Write user documentation for BipartiteGraph
> ---
>
> Key: FLINK-5311
> URL: https://issues.apache.org/jira/browse/FLINK-5311
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>
> We need to add user documentation. The progress on BipartiteGraph can be 
> tracked in the following JIRA:
> https://issues.apache.org/jira/browse/FLINK-2254



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2984: [FLINK-5311] Add user documentation for bipartite graph

2016-12-12 Thread vasia
Github user vasia commented on the issue:

https://github.com/apache/flink/pull/2984
  
Thank you for the update @mushketyk! I still don't see any link from the 
Gelly guide page to the bipartite docs though. Can you please add that too? 
Otherwise people won't be able to find the docs :)
As for the images, I think it would be nice to have show how a projection 
works.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4648) Implement bipartite graph generators

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1570#comment-1570
 ] 

ASF GitHub Bot commented on FLINK-4648:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2986
  
Hi @greghogan ,
Thank you for your review. I've update my PR accordingly.


> Implement bipartite graph generators
> 
>
> Key: FLINK-4648
> URL: https://issues.apache.org/jira/browse/FLINK-4648
> Project: Flink
>  Issue Type: Sub-task
>  Components: Gelly
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>
> Implement generators for bipartite graphs.
> Should implement at least:
> * *BipartiteGraphGenerator* (maybe requires a better name) that will generate 
> a bipartite graph where every vertex of one set is connected only to some 
> vertices  from another set
> * *CompleteBipartiteGraphGenerator* that will generate a graph where every 
> vertex of one set is conneted to every vertex of another set



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2986: [FLINK-4648] Implement BipartiteGraph generator

2016-12-12 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2986
  
Hi @greghogan ,
Thank you for your review. I've update my PR accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-12 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15743965#comment-15743965
 ] 

Jark Wu commented on FLINK-5280:


Hi [~ivan.mushketyk], I will try to answer your question.

The main problem you confused is that {{CodeGenerator}} doesn't support nested 
access. Actually, it has been fixed in FLINK-4294, and you can have a look at 
the test example {{CompositeAccessTest}}. I think it will give you some 
inspiration. 

And the other problem that {{BatchScan#convertToExpectedType}} will convert 
input dataset into Row type. Actually, it will not flatten the nested fields, 
but keep the same schema in Row. In your case, the ParentPojo  will be 
converted Row type with {{Row(child: ChildPojo, num:Int)}}.

Hope that will help you.

> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()

2016-12-12 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15506583#comment-15506583
 ] 

Ted Yu edited comment on FLINK-4534 at 12/13/16 2:06 AM:
-

Feel free to work on this.

Thanks, Liwei .


was (Author: yuzhih...@gmail.com):
Feel free to work on this.

Thanks, Liwei

> Lack of synchronization in BucketingSink#restoreState()
> ---
>
> Key: FLINK-4534
> URL: https://issues.apache.org/jira/browse/FLINK-4534
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>
> Iteration over state.bucketStates is protected by synchronization in other 
> methods, except for the following in restoreState():
> {code}
> for (BucketState bucketState : state.bucketStates.values()) {
> {code}
> and following in close():
> {code}
> for (Map.Entry entry : 
> state.bucketStates.entrySet()) {
>   closeCurrentPartFile(entry.getValue());
> {code}
> w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue 
> starting line 752:
> {code}
>   Set pastCheckpointIds = 
> bucketState.pendingFilesPerCheckpoint.keySet();
>   LOG.debug("Moving pending files to final location on restore.");
>   for (Long pastCheckpointId : pastCheckpointIds) {
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3475) DISTINCT aggregate function support for SQL queries

2016-12-12 Thread Kurt Young (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15743753#comment-15743753
 ] 

Kurt Young commented on FLINK-3475:
---

We are willing to offer some helps on this issue, [~fhueske] [~chengxiang li] 
Is this ok for both of you?

> DISTINCT aggregate function support for SQL queries
> ---
>
> Key: FLINK-3475
> URL: https://issues.apache.org/jira/browse/FLINK-3475
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Chengxiang Li
>
> DISTINCT aggregate function may be able to reuse the aggregate function 
> instead of separate implementation, and let Flink runtime take care of 
> duplicate records.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5323) CheckpointNotifier should be removed from docs

2016-12-12 Thread Abhishek Singh (JIRA)
Abhishek Singh created FLINK-5323:
-

 Summary: CheckpointNotifier should be removed from docs
 Key: FLINK-5323
 URL: https://issues.apache.org/jira/browse/FLINK-5323
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.1.3
Reporter: Abhishek Singh
Priority: Trivial


I was following the official documentation: 
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html

Looks like this is the right one to be using: import 
org.apache.flink.runtime.state.CheckpointListener;

-Abhishek-

On Dec 9, 2016, at 4:30 PM, Abhishek R. Singh  
wrote:

I can’t seem to find CheckpointNotifier. Appreciate help !

CheckpointNotifier is not a member of package 
org.apache.flink.streaming.api.checkpoint

>From my pom.xml:


org.apache.flink
flink-scala_2.11
1.1.3


org.apache.flink
flink-streaming-scala_2.11
1.1.3


org.apache.flink
flink-clients_2.11
1.1.3


org.apache.flink
flink-statebackend-rocksdb_2.11
1.1.3




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15743344#comment-15743344
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/1668
  
To suggest some way to fix the guarantees: To my mind, the crux lies in the 
way that the feedback channel is implemented - a simple blocking queue just 
does not cut it for that case. To make this proper, I think we need to do the 
following:
  - Have an elastic feedback channel (unbounded) with a certain memory 
budget, that can spill if needed. I think it would be best implemented holding 
data serialized.
  - On checkpoint, one simply adds the feedback channel data (already 
bytes) to the checkpoint
  - The source task should probably prioritize reading from the feedback 
channel, to keep it always as small as possible.



> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15743343#comment-15743343
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/1668
  
Thanks for the reminder, I went over the code today. The code looks mostly 
good, but here are some thoughts:

  - The head task supports only one concurrent checkpoint. In general, the 
tasks need to support multiple checkpoints being in progress at the same time. 
It frequently happens when people trigger savepoints concurrent to a running 
checkpoint. I think that is important to support.

  - There tail task offers the elements to the blocking queue. That means 
records are simply dropped if the capacity bound queue (one element) is not 
polled by the head task in time.

  - With the capacity bound in the feedback queue, it is pretty easy to 
build a full deadlock. Just use a loop function that explodes data into the 
feedback channel.

  - Recent code also introduced the ability to change parallelism. What are 
the semantics here when the parallelism of the loop is changed?

Since loops did not support any fault tolerance guarantees, I guess this 
does improve recovery behavior. But as long as the loops can either deadlock or 
drop data, the hard guarantees are in the end still a bit weak. So that leaves 
me a bit ambivalent what to do with this pull request.



> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2016-12-12 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/1668
  
To suggest some way to fix the guarantees: To my mind, the crux lies in the 
way that the feedback channel is implemented - a simple blocking queue just 
does not cut it for that case. To make this proper, I think we need to do the 
following:
  - Have an elastic feedback channel (unbounded) with a certain memory 
budget, that can spill if needed. I think it would be best implemented holding 
data serialized.
  - On checkpoint, one simply adds the feedback channel data (already 
bytes) to the checkpoint
  - The source task should probably prioritize reading from the feedback 
channel, to keep it always as small as possible.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2016-12-12 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/1668
  
Thanks for the reminder, I went over the code today. The code looks mostly 
good, but here are some thoughts:

  - The head task supports only one concurrent checkpoint. In general, the 
tasks need to support multiple checkpoints being in progress at the same time. 
It frequently happens when people trigger savepoints concurrent to a running 
checkpoint. I think that is important to support.

  - There tail task offers the elements to the blocking queue. That means 
records are simply dropped if the capacity bound queue (one element) is not 
polled by the head task in time.

  - With the capacity bound in the feedback queue, it is pretty easy to 
build a full deadlock. Just use a loop function that explodes data into the 
feedback channel.

  - Recent code also introduced the ability to change parallelism. What are 
the semantics here when the parallelism of the loop is changed?

Since loops did not support any fault tolerance guarantees, I guess this 
does improve recovery behavior. But as long as the loops can either deadlock or 
drop data, the hard guarantees are in the end still a bit weak. So that leaves 
me a bit ambivalent what to do with this pull request.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5223) Add documentation of UDTF in Table API & SQL

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15743270#comment-15743270
 ] 

ASF GitHub Bot commented on FLINK-5223:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2956
  
Thanks for the PR @wuchong. The documentation is mostly very good.
I'll rephrase a few minor things before merging.

Thanks, Fabian


> Add documentation of UDTF in Table API & SQL
> 
>
> Key: FLINK-5223
> URL: https://issues.apache.org/jira/browse/FLINK-5223
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2956: [FLINK-5223] [doc] Add documentation of UDTF in Table API...

2016-12-12 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2956
  
Thanks for the PR @wuchong. The documentation is mostly very good.
I'll rephrase a few minor things before merging.

Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-4091) flink-connector-cassandra has conflicting guava version

2016-12-12 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15634445#comment-15634445
 ] 

Shannon Carey edited comment on FLINK-4091 at 12/12/16 9:44 PM:


Flink's inclusion of altered Cassandra classes (though probably unavoidable) 
was causing a lot of problems for us when using a library with dependencies on 
cassandra-driver-core and cassandra-driver-mapping. When launching a Flink job 
locally from the IDE, the two different versions of Cassandra included on the 
classpath would cause runtime errors. Excluding the two dependencies with Maven 
seems to have fixed the issue, allowing us to run our Flink jobs from the IDE 
again. Just figured I'd mention it here in case it helps anyone else. However, 
we do have to be careful in the library not to use methods that return Guava 
classes (such as asyncPrepare) because they'll be written against the 
non-shaded Guava. Relocating the Guava package with the shade plugin works when 
using the jar, but not when running Flink jobs from within the IDE (IntelliJ 
doesn't have full integration with the shade plugin).


was (Author: rehevkor5):
Flink's inclusion of altered Cassandra classes (though probably unavoidable) 
was causing a lot of problems for us when using a library with dependencies on 
cassandra-driver-core and cassandra-driver-mapping. When launching a Flink job 
locally from the IDE, the two different versions of Cassandra classed on the 
classpath would cause runtime errors. Excluding the two dependencies with Maven 
seems to have fixed the issue, allowing us to run our Flink jobs from the IDE 
again. Just figured I'd mention it here in case it helps anyone else.

> flink-connector-cassandra has conflicting guava version
> ---
>
> Key: FLINK-4091
> URL: https://issues.apache.org/jira/browse/FLINK-4091
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
> Environment: MacOSX, 1.10-SNAPSHOT (head is 
> 1a6bab3ef76805685044cf4521e32315169f9033)
>Reporter: Dominik Bruhn
>Assignee: Chesnay Schepler
>
> The newly merged cassandra streaming connector has an issue with its guava 
> dependency.
> The build-process for flink-connector-cassandra creates shaded JAR file which 
> contains the connector, the datastax cassandra driver plus in 
> org.apache.flink.shaded a shaded copy of guava. 
> The datastax cassandra driver calls into Futures.withFallback ([1]) which is 
> present in this guava version. This also works inside the 
> flink-connector-cassandra jar.
> Now the actual build-process for Flink happens and builds another shaded JAR 
> and creates the flink-dist.jar. Inside this JAR, there is also a shaded 
> version of guava inside org.apache.flink.shaded.
> Now the issue: The guava version which is in the flink-dist.jar is not 
> compatible and doesn't contain the Futures.withFallback which the datastax 
> driver is using.
> This leads into the following issue: You can without any problems launch a 
> flink task which uses the casandra driver locally (so through the 
> mini-cluster) because that is never using the flink-dist.jar. 
> BUT: As soon as you are trying to start this job on a flink cluster (which 
> uses the flink-dist.jar), the job breaks with the following exception:
> https://gist.github.com/theomega/5ab9b14ffb516b15814de28e499b040d
> You can inspect this by opening the 
> flink-connector-cassandra_2.11-1.1-SNAPSHOT.jar and the 
> flink-dist_2.11-1.1-SNAPSHOT.jar in a java decompiler.
> I don't know a good solution here: Perhaps it would be one solution to shade 
> the guava for the cassandra-driver somewhere else than at 
> org.apache.flink.shaded.
> [1]: 
> https://google.github.io/guava/releases/19.0/api/docs/com/google/common/util/concurrent/Futures.html#withFallback(com.google.common.util.concurrent.ListenableFuture,
>  com.google.common.util.concurrent.FutureFallback, 
> java.util.concurrent.Executor)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5187) Create analog of Row in core

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15743211#comment-15743211
 ] 

ASF GitHub Bot commented on FLINK-5187:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2968#discussion_r92037566
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java
 ---
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.KeyFieldOutOfBoundsException;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.ArrayList;
--- End diff --

`ArrayList` is unused and needs to be removed.


> Create analog of Row in core
> 
>
> Key: FLINK-5187
> URL: https://issues.apache.org/jira/browse/FLINK-5187
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core, Table API & SQL
>Reporter: Anton Solovev
>Assignee: Jark Wu
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2968: [FLINK-5187] [core] Create analog of Row and RowTy...

2016-12-12 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2968#discussion_r92037566
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java
 ---
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.KeyFieldOutOfBoundsException;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.ArrayList;
--- End diff --

`ArrayList` is unused and needs to be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5245) Add support for BipartiteGraph mutations

2016-12-12 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15743127#comment-15743127
 ] 

Ivan Mushketyk commented on FLINK-5245:
---

Thank you for your answer, Greg.
Do I understand you correctly "mutators" is the only questionable feature for 
BiparateGraph and the rest are good to go from your point of view?

Regarding the duplication, I don't there is much duplication now between 
BipartiteGraph and Graph, but this may change later when we implement other 
features. BipartiteEdge can be a super class for the Edge class.

> Add support for BipartiteGraph mutations
> 
>
> Key: FLINK-5245
> URL: https://issues.apache.org/jira/browse/FLINK-5245
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>
> Implement methods for adding and removing vertices and edges similarly to 
> Graph class.
> Depends on https://issues.apache.org/jira/browse/FLINK-2254



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5108) Remove ClientShutdownHook during job execution

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15743050#comment-15743050
 ] 

ASF GitHub Bot commented on FLINK-5108:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2928
  
+1 for that approach


> Remove ClientShutdownHook during job execution
> --
>
> Key: FLINK-5108
> URL: https://issues.apache.org/jira/browse/FLINK-5108
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Maximilian Michels
>Assignee: Renkai Ge
>Priority: Blocker
> Fix For: 1.2.0
>
>
> The behavior of the Standalone mode is to not react to client interrupts once 
> a job has been deployed. We should change the Yarn client implementation to 
> behave the same. This avoids accidental shutdown of the job, e.g. when the 
> user sends an interrupt via CTRL-C or when the client machine shuts down.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2928: [FLINK-5108] Remove ClientShutdownHook during job executi...

2016-12-12 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2928
  
+1 for that approach


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5245) Add support for BipartiteGraph mutations

2016-12-12 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15743017#comment-15743017
 ] 

Greg Hogan commented on FLINK-5245:
---

In addition to being useful the API must promote efficiency and best practices. 
The add/remove methods are lacking since 1) the vertices/edges must be known a 
priori, 2) the methods are not scalable, and 3) users may have the idea to call 
these methods inside a local for loop.

We should rethink the API as features are added. I would like to see 
`getUndirected()` and `getDegrees()` reimplemented.

I like that we're adding a lot of bipartite graph functionality. How can we 
best avoid duplication of code? Perhaps Edge should be a specialization of 
BipartiteEdge and Graph a specialization of BipartiteGraph (the names would 
change, of course). Implementing some algorithms (such as redundancy from the 
two-mode paper) may help answer these questions.

> Add support for BipartiteGraph mutations
> 
>
> Key: FLINK-5245
> URL: https://issues.apache.org/jira/browse/FLINK-5245
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>
> Implement methods for adding and removing vertices and edges similarly to 
> Graph class.
> Depends on https://issues.apache.org/jira/browse/FLINK-2254



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742957#comment-15742957
 ] 

ASF GitHub Bot commented on FLINK-5113:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2939
  
It should be part of this PR since this case simply didn't exist before the 
new interface. The equivalent to no state was the state being null, at which 
point restore was never called in the first place. Now, if the state is null we 
get an empty list, afaik.

Here is what is confusing me: Every single function checks whether the 
state is empty. Every one. So, there is apparently the possibility that it's 
empty. But the behavior for that case does not seem well-defined.

According to the code receiving an empty state list is not a reason to fail 
for any of these tests.

If this is the case we don't need to actually implement `restoreState` in 
the first place since it is irrelevant to the result of the test
If this is not the case we should try to fail as early as possible by 
adding a failure condition.


> Make all Testing Functions implement CheckpointedFunction Interface.
> 
>
> Key: FLINK-5113
> URL: https://issues.apache.org/jira/browse/FLINK-5113
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Currently stateful functions implement the (old) Checkpointed interface.
> This is issue aims at porting all these function to the new 
> CheckpointedFunction interface, so that they can leverage the new 
> capabilities by it. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2939: [FLINK-5113] Ports all functions in the tests to the new ...

2016-12-12 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2939
  
It should be part of this PR since this case simply didn't exist before the 
new interface. The equivalent to no state was the state being null, at which 
point restore was never called in the first place. Now, if the state is null we 
get an empty list, afaik.

Here is what is confusing me: Every single function checks whether the 
state is empty. Every one. So, there is apparently the possibility that it's 
empty. But the behavior for that case does not seem well-defined.

According to the code receiving an empty state list is not a reason to fail 
for any of these tests.

If this is the case we don't need to actually implement `restoreState` in 
the first place since it is irrelevant to the result of the test
If this is not the case we should try to fail as early as possible by 
adding a failure condition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5189) Deprecate table.Row

2016-12-12 Thread Anton Solovev (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Solovev updated FLINK-5189:
-
Summary: Deprecate table.Row  (was: Delete table.Row)

> Deprecate table.Row
> ---
>
> Key: FLINK-5189
> URL: https://issues.apache.org/jira/browse/FLINK-5189
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core, Table API & SQL
>Reporter: Anton Solovev
>Assignee: Anton Solovev
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5187) Create analog of Row in core

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742935#comment-15742935
 ] 

ASF GitHub Bot commented on FLINK-5187:
---

Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2968
  
@fhueske Yes. I already  started 
https://github.com/tonycox/flink/tree/FLINK-5188


> Create analog of Row in core
> 
>
> Key: FLINK-5187
> URL: https://issues.apache.org/jira/browse/FLINK-5187
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core, Table API & SQL
>Reporter: Anton Solovev
>Assignee: Jark Wu
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2968: [FLINK-5187] [core] Create analog of Row and RowTypeInfo ...

2016-12-12 Thread tonycox
Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2968
  
@fhueske Yes. I already  started 
https://github.com/tonycox/flink/tree/FLINK-5188


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5319) ClassCastException when reusing an inherited method reference as KeySelector for different classes

2016-12-12 Thread Alexander Chermenin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742901#comment-15742901
 ] 

Alexander Chermenin commented on FLINK-5319:


More simple test with the same result:
{code}ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet longDataSet = env.fromCollection(Arrays.asList(1L, 2L, 3L, 4L, 
5L));
DataSet intDataSet = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5));
longDataSet.map(Number::doubleValue).print();
intDataSet.map(Number::doubleValue).print();{code}


> ClassCastException when reusing an inherited method reference as KeySelector 
> for different classes
> --
>
> Key: FLINK-5319
> URL: https://issues.apache.org/jira/browse/FLINK-5319
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Alexander Chermenin
>
> Code sample:
> {code}static abstract class A {
> int id;
> A(int id) {this.id = id; }
> int getId() { return id; }
> }
> static class B extends A { B(int id) { super(id % 3); } }
> static class C extends A { C(int id) { super(id % 2); } }
> private static B b(int id) { return new B(id); }
> private static C c(int id) { return new C(id); }
> /**
>  * Main method.
>  */
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment environment =
> StreamExecutionEnvironment.getExecutionEnvironment();
> B[] bs = IntStream.range(0, 10).mapToObj(Test::b).toArray(B[]::new);
> C[] cs = IntStream.range(0, 10).mapToObj(Test::c).toArray(C[]::new);
> DataStreamSource bStream = environment.fromElements(bs);
> DataStreamSource cStream = environment.fromElements(cs);
> bStream.keyBy((KeySelector) A::getId).print();
> cStream.keyBy((KeySelector) A::getId).print();
> environment.execute();
> }
> {code}
> This code throws next exception:
> {code}Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:901)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: Could not extract key from 
> org.sample.flink.examples.Test$C@5e1a8111
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:75)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:746)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:724)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:84)
>   at 
> org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:127)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:75)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not extract key from 
> org.sample.flink.examples.Test$C@5e1a8111
>   at 
> 

[jira] [Commented] (FLINK-5322) yarn.taskmanager.env value does not appear in System.getenv

2016-12-12 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742870#comment-15742870
 ] 

Chesnay Schepler commented on FLINK-5322:
-

Could you try whether this works:

{code}
yarn.taskmanager.env.MY_ENV: test
{code}

> yarn.taskmanager.env value does not appear in System.getenv
> ---
>
> Key: FLINK-5322
> URL: https://issues.apache.org/jira/browse/FLINK-5322
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
> Environment: Flink 1.1.3 on AWS EMR emr-5.2.0 (Hadoop "Amazon 2.7.3")
>Reporter: Shannon Carey
>Priority: Trivial
> Fix For: 1.1.3
>
>
> The value I specified in flink-conf.yaml
> {code}
> yarn.taskmanager.env:
>   MY_ENV: test
> {code}
> is not available in {{System.getenv("MY_ENV")}} from the plan execution 
> (execution flow of main method) nor from within execution of a streaming 
> operator.
> Interestingly, it does appear within the Flink JobManager Web UI under Job 
> Manager -> Configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4631) NullPointerException during stream task cleanup

2016-12-12 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-4631.
---
Resolution: Fixed

> NullPointerException during stream task cleanup
> ---
>
> Key: FLINK-4631
> URL: https://issues.apache.org/jira/browse/FLINK-4631
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.2
> Environment: Ubuntu server 12.04.5 64 bit
> java version "1.8.0_40"
> Java(TM) SE Runtime Environment (build 1.8.0_40-b26)
> Java HotSpot(TM) 64-Bit Server VM (build 25.40-b25, mixed mode)
>Reporter: Avihai Berkovitz
> Fix For: 1.2.0
>
>
> If a streaming job failed during startup (in my case, due to lack of network 
> buffers), all the tasks are being cancelled before they started. This causes 
> many instances of the following exception:
> {noformat}
> 2016-09-18 14:17:12,177 ERROR 
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Error during 
> cleanup of stream task
> java.lang.NullPointerException
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.cleanup(OneInputStreamTask.java:73)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5319) ClassCastException when reusing an inherited method reference as KeySelector for different classes

2016-12-12 Thread Alexander Chermenin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742827#comment-15742827
 ] 

Alexander Chermenin commented on FLINK-5319:


There are no problems if use code either only with B or only with C.

> ClassCastException when reusing an inherited method reference as KeySelector 
> for different classes
> --
>
> Key: FLINK-5319
> URL: https://issues.apache.org/jira/browse/FLINK-5319
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Alexander Chermenin
>
> Code sample:
> {code}static abstract class A {
> int id;
> A(int id) {this.id = id; }
> int getId() { return id; }
> }
> static class B extends A { B(int id) { super(id % 3); } }
> static class C extends A { C(int id) { super(id % 2); } }
> private static B b(int id) { return new B(id); }
> private static C c(int id) { return new C(id); }
> /**
>  * Main method.
>  */
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment environment =
> StreamExecutionEnvironment.getExecutionEnvironment();
> B[] bs = IntStream.range(0, 10).mapToObj(Test::b).toArray(B[]::new);
> C[] cs = IntStream.range(0, 10).mapToObj(Test::c).toArray(C[]::new);
> DataStreamSource bStream = environment.fromElements(bs);
> DataStreamSource cStream = environment.fromElements(cs);
> bStream.keyBy((KeySelector) A::getId).print();
> cStream.keyBy((KeySelector) A::getId).print();
> environment.execute();
> }
> {code}
> This code throws next exception:
> {code}Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:901)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: Could not extract key from 
> org.sample.flink.examples.Test$C@5e1a8111
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:75)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:746)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:724)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:84)
>   at 
> org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:127)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:75)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not extract key from 
> org.sample.flink.examples.Test$C@5e1a8111
>   at 
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)
>   at 
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
>   at 
> 

[jira] [Commented] (FLINK-5245) Add support for BipartiteGraph mutations

2016-12-12 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742773#comment-15742773
 ] 

Ivan Mushketyk commented on FLINK-5245:
---

Are these methods useful for Graph class?

Here is a list of BipartiteGraph features that are pending implementation or 
review:
* Graph mutation method
* Accessors
* CSV readers
* Validators
* Generators
* Metrics
* Graph properties
* Graph transformations
* Scala interface
* Bipartite graph examples

What do you think is useful for BipartiteGraph? Is there a case that a feature 
can be useful for Graph class, but useless for BipartiteGraph class?


> Add support for BipartiteGraph mutations
> 
>
> Key: FLINK-5245
> URL: https://issues.apache.org/jira/browse/FLINK-5245
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>
> Implement methods for adding and removing vertices and edges similarly to 
> Graph class.
> Depends on https://issues.apache.org/jira/browse/FLINK-2254



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5245) Add support for BipartiteGraph mutations

2016-12-12 Thread Vasia Kalavri (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742744#comment-15742744
 ] 

Vasia Kalavri commented on FLINK-5245:
--

I don't think so. We used to have some simple examples that showcased how to 
create a graph in this way, but I don't think we really need such methods for 
the bipartite graph. That said, we should probably go through all the bipartite 
features and decide whether they are useful, e.g. validator and generators. Do 
they even make sense for bipartite graphs? Or when do they?

> Add support for BipartiteGraph mutations
> 
>
> Key: FLINK-5245
> URL: https://issues.apache.org/jira/browse/FLINK-5245
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>
> Implement methods for adding and removing vertices and edges similarly to 
> Graph class.
> Depends on https://issues.apache.org/jira/browse/FLINK-2254



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5245) Add support for BipartiteGraph mutations

2016-12-12 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742726#comment-15742726
 ] 

Greg Hogan commented on FLINK-5245:
---

[~vkalavri] are these methods used outside of tests?

> Add support for BipartiteGraph mutations
> 
>
> Key: FLINK-5245
> URL: https://issues.apache.org/jira/browse/FLINK-5245
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>
> Implement methods for adding and removing vertices and edges similarly to 
> Graph class.
> Depends on https://issues.apache.org/jira/browse/FLINK-2254



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2986: [FLINK-4648] Implement BipartiteGraph generator

2016-12-12 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2986#discussion_r92007073
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteBipartiteGraph.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
--- End diff --

As with validation, we should consider packaging these under the 
`bipartite` package.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2986: [FLINK-4648] Implement BipartiteGraph generator

2016-12-12 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2986#discussion_r92006696
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteBipartiteGraph.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.bipartite.BipartiteEdge;
+import org.apache.flink.graph.bipartite.BipartiteGraph;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+/**
+ * Generate a complete bipartate graph where every top node is connected 
to every bottom node.
+ */
+public class CompleteBipartiteGraph
+   implements BipartiteGraphGenerator {
+
+   // Required to create the DataSource
+   private final ExecutionEnvironment env;
+
+   // Required configuration
+   private final long topVertexCount;
+   private final long bottomVertexCount;
+
+   private int parallelism = 1;
--- End diff --

Use `PARALLELISM_DEFAULT`. We used to use `-1`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4648) Implement bipartite graph generators

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742705#comment-15742705
 ] 

ASF GitHub Bot commented on FLINK-4648:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2986#discussion_r92006686
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteBipartiteGraph.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.bipartite.BipartiteEdge;
+import org.apache.flink.graph.bipartite.BipartiteGraph;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+/**
+ * Generate a complete bipartate graph where every top node is connected 
to every bottom node.
+ */
+public class CompleteBipartiteGraph
+   implements BipartiteGraphGenerator {
+
+   // Required to create the DataSource
+   private final ExecutionEnvironment env;
+
+   // Required configuration
+   private final long topVertexCount;
+   private final long bottomVertexCount;
+
+   private int parallelism = 1;
+
+   public CompleteBipartiteGraph(ExecutionEnvironment env, long 
topVertexCount, long bottomVertexCount) {
+   this.env = env;
+   this.topVertexCount = topVertexCount;
+   this.bottomVertexCount = bottomVertexCount;
+   }
+
+   @Override
+   public BipartiteGraph generate() {
+   DataSet> topVertices
+   = GraphGeneratorUtils.vertexSequence(env, parallelism, 
topVertexCount);
+   DataSet> bottomVertices
+   = GraphGeneratorUtils.vertexSequence(env, parallelism, 
bottomVertexCount);
+
+   DataSet> edges = 
topVertices.cross(bottomVertices)
--- End diff --

Can implement `EdgeGenerator` as a `flatMap` instead of using a `cross`, as 
in `CompleteGraph`.


> Implement bipartite graph generators
> 
>
> Key: FLINK-4648
> URL: https://issues.apache.org/jira/browse/FLINK-4648
> Project: Flink
>  Issue Type: Sub-task
>  Components: Gelly
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>
> Implement generators for bipartite graphs.
> Should implement at least:
> * *BipartiteGraphGenerator* (maybe requires a better name) that will generate 
> a bipartite graph where every vertex of one set is connected only to some 
> vertices  from another set
> * *CompleteBipartiteGraphGenerator* that will generate a graph where every 
> vertex of one set is conneted to every vertex of another set



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4648) Implement bipartite graph generators

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742702#comment-15742702
 ] 

ASF GitHub Bot commented on FLINK-4648:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2986#discussion_r92006696
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteBipartiteGraph.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.bipartite.BipartiteEdge;
+import org.apache.flink.graph.bipartite.BipartiteGraph;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+/**
+ * Generate a complete bipartate graph where every top node is connected 
to every bottom node.
+ */
+public class CompleteBipartiteGraph
+   implements BipartiteGraphGenerator {
+
+   // Required to create the DataSource
+   private final ExecutionEnvironment env;
+
+   // Required configuration
+   private final long topVertexCount;
+   private final long bottomVertexCount;
+
+   private int parallelism = 1;
--- End diff --

Use `PARALLELISM_DEFAULT`. We used to use `-1`.


> Implement bipartite graph generators
> 
>
> Key: FLINK-4648
> URL: https://issues.apache.org/jira/browse/FLINK-4648
> Project: Flink
>  Issue Type: Sub-task
>  Components: Gelly
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>
> Implement generators for bipartite graphs.
> Should implement at least:
> * *BipartiteGraphGenerator* (maybe requires a better name) that will generate 
> a bipartite graph where every vertex of one set is connected only to some 
> vertices  from another set
> * *CompleteBipartiteGraphGenerator* that will generate a graph where every 
> vertex of one set is conneted to every vertex of another set



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4648) Implement bipartite graph generators

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742703#comment-15742703
 ] 

ASF GitHub Bot commented on FLINK-4648:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2986#discussion_r92007172
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteBipartiteGraph.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.bipartite.BipartiteEdge;
+import org.apache.flink.graph.bipartite.BipartiteGraph;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+/**
+ * Generate a complete bipartate graph where every top node is connected 
to every bottom node.
+ */
+public class CompleteBipartiteGraph
+   implements BipartiteGraphGenerator {
+
+   // Required to create the DataSource
+   private final ExecutionEnvironment env;
+
+   // Required configuration
+   private final long topVertexCount;
+   private final long bottomVertexCount;
+
+   private int parallelism = 1;
+
+   public CompleteBipartiteGraph(ExecutionEnvironment env, long 
topVertexCount, long bottomVertexCount) {
+   this.env = env;
+   this.topVertexCount = topVertexCount;
+   this.bottomVertexCount = bottomVertexCount;
+   }
+
+   @Override
+   public BipartiteGraph generate() {
+   DataSet> topVertices
+   = GraphGeneratorUtils.vertexSequence(env, parallelism, 
topVertexCount);
+   DataSet> bottomVertices
+   = GraphGeneratorUtils.vertexSequence(env, parallelism, 
bottomVertexCount);
+
+   DataSet> edges = 
topVertices.cross(bottomVertices)
+   .setParallelism(parallelism)
+   .map(new EdgeGenerator())
+   .setParallelism(parallelism);
+
+   return BipartiteGraph.fromDataSet(topVertices, bottomVertices, 
edges, env);
+   }
+
+   @Override
+   public BipartiteGraphGenerator setParallelism(int parallelism) {
--- End diff --

Move this to an `AbstractBipartiteGraphGenerator` as with 
`AbstractGraphGenerator`?


> Implement bipartite graph generators
> 
>
> Key: FLINK-4648
> URL: https://issues.apache.org/jira/browse/FLINK-4648
> Project: Flink
>  Issue Type: Sub-task
>  Components: Gelly
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>
> Implement generators for bipartite graphs.
> Should implement at least:
> * *BipartiteGraphGenerator* (maybe requires a better name) that will generate 
> a bipartite graph where every vertex of one set is connected only to some 
> vertices  from another set
> * *CompleteBipartiteGraphGenerator* that will generate a graph where every 
> vertex of one set is conneted to every vertex of another set



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4648) Implement bipartite graph generators

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742704#comment-15742704
 ] 

ASF GitHub Bot commented on FLINK-4648:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2986#discussion_r92007073
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteBipartiteGraph.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
--- End diff --

As with validation, we should consider packaging these under the 
`bipartite` package.


> Implement bipartite graph generators
> 
>
> Key: FLINK-4648
> URL: https://issues.apache.org/jira/browse/FLINK-4648
> Project: Flink
>  Issue Type: Sub-task
>  Components: Gelly
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>
> Implement generators for bipartite graphs.
> Should implement at least:
> * *BipartiteGraphGenerator* (maybe requires a better name) that will generate 
> a bipartite graph where every vertex of one set is connected only to some 
> vertices  from another set
> * *CompleteBipartiteGraphGenerator* that will generate a graph where every 
> vertex of one set is conneted to every vertex of another set



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2986: [FLINK-4648] Implement BipartiteGraph generator

2016-12-12 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2986#discussion_r92007172
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteBipartiteGraph.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.bipartite.BipartiteEdge;
+import org.apache.flink.graph.bipartite.BipartiteGraph;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+/**
+ * Generate a complete bipartate graph where every top node is connected 
to every bottom node.
+ */
+public class CompleteBipartiteGraph
+   implements BipartiteGraphGenerator {
+
+   // Required to create the DataSource
+   private final ExecutionEnvironment env;
+
+   // Required configuration
+   private final long topVertexCount;
+   private final long bottomVertexCount;
+
+   private int parallelism = 1;
+
+   public CompleteBipartiteGraph(ExecutionEnvironment env, long 
topVertexCount, long bottomVertexCount) {
+   this.env = env;
+   this.topVertexCount = topVertexCount;
+   this.bottomVertexCount = bottomVertexCount;
+   }
+
+   @Override
+   public BipartiteGraph generate() {
+   DataSet> topVertices
+   = GraphGeneratorUtils.vertexSequence(env, parallelism, 
topVertexCount);
+   DataSet> bottomVertices
+   = GraphGeneratorUtils.vertexSequence(env, parallelism, 
bottomVertexCount);
+
+   DataSet> edges = 
topVertices.cross(bottomVertices)
+   .setParallelism(parallelism)
+   .map(new EdgeGenerator())
+   .setParallelism(parallelism);
+
+   return BipartiteGraph.fromDataSet(topVertices, bottomVertices, 
edges, env);
+   }
+
+   @Override
+   public BipartiteGraphGenerator setParallelism(int parallelism) {
--- End diff --

Move this to an `AbstractBipartiteGraphGenerator` as with 
`AbstractGraphGenerator`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2986: [FLINK-4648] Implement BipartiteGraph generator

2016-12-12 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2986#discussion_r92006686
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteBipartiteGraph.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.bipartite.BipartiteEdge;
+import org.apache.flink.graph.bipartite.BipartiteGraph;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+/**
+ * Generate a complete bipartate graph where every top node is connected 
to every bottom node.
+ */
+public class CompleteBipartiteGraph
+   implements BipartiteGraphGenerator {
+
+   // Required to create the DataSource
+   private final ExecutionEnvironment env;
+
+   // Required configuration
+   private final long topVertexCount;
+   private final long bottomVertexCount;
+
+   private int parallelism = 1;
+
+   public CompleteBipartiteGraph(ExecutionEnvironment env, long 
topVertexCount, long bottomVertexCount) {
+   this.env = env;
+   this.topVertexCount = topVertexCount;
+   this.bottomVertexCount = bottomVertexCount;
+   }
+
+   @Override
+   public BipartiteGraph generate() {
+   DataSet> topVertices
+   = GraphGeneratorUtils.vertexSequence(env, parallelism, 
topVertexCount);
+   DataSet> bottomVertices
+   = GraphGeneratorUtils.vertexSequence(env, parallelism, 
bottomVertexCount);
+
+   DataSet> edges = 
topVertices.cross(bottomVertices)
--- End diff --

Can implement `EdgeGenerator` as a `flatMap` instead of using a `cross`, as 
in `CompleteGraph`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5104) Implement BipartiteGraph validator

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742663#comment-15742663
 ] 

ASF GitHub Bot commented on FLINK-5104:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2985#discussion_r92004487
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidBipartiteVertexIdsValidator.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.validation;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.bipartite.BipartiteEdge;
+import org.apache.flink.graph.bipartite.BipartiteGraph;
+import org.apache.flink.util.Collector;
+import static 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+
+/**
+ * Checks that the edge set input contains valid vertex Ids, i.e. that they
+ * also exist in top and bottom vertex sets.
+ */
+public class InvalidBipartiteVertexIdsValidator 
extends BipartiteGraphValidator {
+
+   @Override
+   public boolean validate(BipartiteGraph 
bipartiteGraph) throws Exception {
+   DataSet edgesTopIds = 
bipartiteGraph.getEdges().map(new GetTopIdsMap());
--- End diff --

Can `GetTopIdsMap` be replaced with `.project(0)`?


> Implement BipartiteGraph validator
> --
>
> Key: FLINK-5104
> URL: https://issues.apache.org/jira/browse/FLINK-5104
> Project: Flink
>  Issue Type: Sub-task
>  Components: Gelly
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>
> BipartiteGraph should have a validator similar to GraphValidator for Graph 
> class.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5104) Implement BipartiteGraph validator

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742661#comment-15742661
 ] 

ASF GitHub Bot commented on FLINK-5104:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2985#discussion_r91994405
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/BipartiteGraphValidator.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.validation;
--- End diff --

Move to `org.apache.flink.graph.bipartite.validation`?


> Implement BipartiteGraph validator
> --
>
> Key: FLINK-5104
> URL: https://issues.apache.org/jira/browse/FLINK-5104
> Project: Flink
>  Issue Type: Sub-task
>  Components: Gelly
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>
> BipartiteGraph should have a validator similar to GraphValidator for Graph 
> class.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5104) Implement BipartiteGraph validator

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742662#comment-15742662
 ] 

ASF GitHub Bot commented on FLINK-5104:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2985#discussion_r91997234
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidBipartiteVertexIdsValidator.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.validation;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.bipartite.BipartiteEdge;
+import org.apache.flink.graph.bipartite.BipartiteGraph;
+import org.apache.flink.util.Collector;
+import static 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+
+/**
+ * Checks that the edge set input contains valid vertex Ids, i.e. that they
+ * also exist in top and bottom vertex sets.
+ */
+public class InvalidBipartiteVertexIdsValidator 
extends BipartiteGraphValidator {
+
+   @Override
+   public boolean validate(BipartiteGraph 
bipartiteGraph) throws Exception {
+   DataSet edgesTopIds = 
bipartiteGraph.getEdges().map(new GetTopIdsMap());
+   DataSet edgesBottomIds = 
bipartiteGraph.getEdges().map(new GetBottomIdsMap());
+
+   DataSet invalidTopIds = 
invalidIds(bipartiteGraph.getTopVertices(), edgesTopIds);
+   DataSet invalidBottomIds = 
invalidIds(bipartiteGraph.getBottomVertices(), edgesBottomIds);
+
+   return invalidTopIds.count() == 0 && invalidBottomIds.count() 
== 0;
+   }
+
+   private  DataSet invalidIds(DataSet> topVertices, 
DataSet edgesIds) {
+   return topVertices.coGroup(edgesIds)
+   .where(0)
+   .equalTo(0)
+   .with(new CoGroupFunction, Tuple1, K>() {
+   @Override
+   public void coGroup(Iterable> 
vertices, Iterable edgeIds, Collector out) throws Exception {
+   if (!vertices.iterator().hasNext()) {
+   
out.collect(edgeIds.iterator().next().f0);
+   }
+   }
+   });
+   }
+
+   @ForwardedFields("f0")
+   private class GetTopIdsMap implements 
MapFunction, Tuple1> {
--- End diff --

Inner Flink functions should be `static` to prevent serialization of the 
outer class.


> Implement BipartiteGraph validator
> --
>
> Key: FLINK-5104
> URL: https://issues.apache.org/jira/browse/FLINK-5104
> Project: Flink
>  Issue Type: Sub-task
>  Components: Gelly
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>
> BipartiteGraph should have a validator similar to GraphValidator for Graph 
> class.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5104) Implement BipartiteGraph validator

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742664#comment-15742664
 ] 

ASF GitHub Bot commented on FLINK-5104:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2985#discussion_r91996140
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidBipartiteVertexIdsValidator.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.validation;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.bipartite.BipartiteEdge;
+import org.apache.flink.graph.bipartite.BipartiteGraph;
+import org.apache.flink.util.Collector;
+import static 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+
+/**
+ * Checks that the edge set input contains valid vertex Ids, i.e. that they
+ * also exist in top and bottom vertex sets.
+ */
+public class InvalidBipartiteVertexIdsValidator 
extends BipartiteGraphValidator {
+
+   @Override
+   public boolean validate(BipartiteGraph 
bipartiteGraph) throws Exception {
+   DataSet edgesTopIds = 
bipartiteGraph.getEdges().map(new GetTopIdsMap());
+   DataSet edgesBottomIds = 
bipartiteGraph.getEdges().map(new GetBottomIdsMap());
+
+   DataSet invalidTopIds = 
invalidIds(bipartiteGraph.getTopVertices(), edgesTopIds);
+   DataSet invalidBottomIds = 
invalidIds(bipartiteGraph.getBottomVertices(), edgesBottomIds);
+
+   return invalidTopIds.count() == 0 && invalidBottomIds.count() 
== 0;
--- End diff --

`count()` executes the program so shouldn't be called twice. Could use an 
accumulator in `RichOutputFormat` instead.


> Implement BipartiteGraph validator
> --
>
> Key: FLINK-5104
> URL: https://issues.apache.org/jira/browse/FLINK-5104
> Project: Flink
>  Issue Type: Sub-task
>  Components: Gelly
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>
> BipartiteGraph should have a validator similar to GraphValidator for Graph 
> class.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2985: [FLINK-5104] Bipartite graph validation

2016-12-12 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2985#discussion_r91994405
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/BipartiteGraphValidator.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.validation;
--- End diff --

Move to `org.apache.flink.graph.bipartite.validation`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2985: [FLINK-5104] Bipartite graph validation

2016-12-12 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2985#discussion_r91997234
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidBipartiteVertexIdsValidator.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.validation;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.bipartite.BipartiteEdge;
+import org.apache.flink.graph.bipartite.BipartiteGraph;
+import org.apache.flink.util.Collector;
+import static 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+
+/**
+ * Checks that the edge set input contains valid vertex Ids, i.e. that they
+ * also exist in top and bottom vertex sets.
+ */
+public class InvalidBipartiteVertexIdsValidator 
extends BipartiteGraphValidator {
+
+   @Override
+   public boolean validate(BipartiteGraph 
bipartiteGraph) throws Exception {
+   DataSet edgesTopIds = 
bipartiteGraph.getEdges().map(new GetTopIdsMap());
+   DataSet edgesBottomIds = 
bipartiteGraph.getEdges().map(new GetBottomIdsMap());
+
+   DataSet invalidTopIds = 
invalidIds(bipartiteGraph.getTopVertices(), edgesTopIds);
+   DataSet invalidBottomIds = 
invalidIds(bipartiteGraph.getBottomVertices(), edgesBottomIds);
+
+   return invalidTopIds.count() == 0 && invalidBottomIds.count() 
== 0;
+   }
+
+   private  DataSet invalidIds(DataSet> topVertices, 
DataSet edgesIds) {
+   return topVertices.coGroup(edgesIds)
+   .where(0)
+   .equalTo(0)
+   .with(new CoGroupFunction, Tuple1, K>() {
+   @Override
+   public void coGroup(Iterable> 
vertices, Iterable edgeIds, Collector out) throws Exception {
+   if (!vertices.iterator().hasNext()) {
+   
out.collect(edgeIds.iterator().next().f0);
+   }
+   }
+   });
+   }
+
+   @ForwardedFields("f0")
+   private class GetTopIdsMap implements 
MapFunction, Tuple1> {
--- End diff --

Inner Flink functions should be `static` to prevent serialization of the 
outer class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2985: [FLINK-5104] Bipartite graph validation

2016-12-12 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2985#discussion_r91996140
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidBipartiteVertexIdsValidator.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.validation;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.bipartite.BipartiteEdge;
+import org.apache.flink.graph.bipartite.BipartiteGraph;
+import org.apache.flink.util.Collector;
+import static 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+
+/**
+ * Checks that the edge set input contains valid vertex Ids, i.e. that they
+ * also exist in top and bottom vertex sets.
+ */
+public class InvalidBipartiteVertexIdsValidator 
extends BipartiteGraphValidator {
+
+   @Override
+   public boolean validate(BipartiteGraph 
bipartiteGraph) throws Exception {
+   DataSet edgesTopIds = 
bipartiteGraph.getEdges().map(new GetTopIdsMap());
+   DataSet edgesBottomIds = 
bipartiteGraph.getEdges().map(new GetBottomIdsMap());
+
+   DataSet invalidTopIds = 
invalidIds(bipartiteGraph.getTopVertices(), edgesTopIds);
+   DataSet invalidBottomIds = 
invalidIds(bipartiteGraph.getBottomVertices(), edgesBottomIds);
+
+   return invalidTopIds.count() == 0 && invalidBottomIds.count() 
== 0;
--- End diff --

`count()` executes the program so shouldn't be called twice. Could use an 
accumulator in `RichOutputFormat` instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2985: [FLINK-5104] Bipartite graph validation

2016-12-12 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2985#discussion_r92004487
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidBipartiteVertexIdsValidator.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.validation;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.bipartite.BipartiteEdge;
+import org.apache.flink.graph.bipartite.BipartiteGraph;
+import org.apache.flink.util.Collector;
+import static 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+
+/**
+ * Checks that the edge set input contains valid vertex Ids, i.e. that they
+ * also exist in top and bottom vertex sets.
+ */
+public class InvalidBipartiteVertexIdsValidator 
extends BipartiteGraphValidator {
+
+   @Override
+   public boolean validate(BipartiteGraph 
bipartiteGraph) throws Exception {
+   DataSet edgesTopIds = 
bipartiteGraph.getEdges().map(new GetTopIdsMap());
--- End diff --

Can `GetTopIdsMap` be replaced with `.project(0)`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-4858) Remove Legacy Checkpointing Interfaces

2016-12-12 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742647#comment-15742647
 ] 

Robert Metzger edited comment on FLINK-4858 at 12/12/16 6:11 PM:
-

Okay, thank you for your input. I'll remove 1.2 as a target fix version.


was (Author: rmetzger):
Okay, thank you for your input. I'lll remove 1.2 as a target fix version.

> Remove Legacy Checkpointing Interfaces
> --
>
> Key: FLINK-4858
> URL: https://issues.apache.org/jira/browse/FLINK-4858
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing, Streaming
>Affects Versions: 1.2.0
>Reporter: Aljoscha Krettek
>Priority: Blocker
>
> This issue tracks the removal of the deprecated/legacy interfaces that are 
> still in the code and can only be removed once the move to the new Flink 1.2 
> checkpointing is completely in place.
> So far, these are {{StreamCheckpointedOperator}} and the legacy snapshot 
> methods on the testing harnesses. Furthermore, codepaths and classes touching 
> legacy state (e.g. {{StateAssignmentOperation}} or {{TaskStateHandles}} can 
> be cleaned up. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4858) Remove Legacy Checkpointing Interfaces

2016-12-12 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-4858:
--
Component/s: State Backends, Checkpointing

> Remove Legacy Checkpointing Interfaces
> --
>
> Key: FLINK-4858
> URL: https://issues.apache.org/jira/browse/FLINK-4858
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing, Streaming
>Affects Versions: 1.2.0
>Reporter: Aljoscha Krettek
>Priority: Blocker
>
> This issue tracks the removal of the deprecated/legacy interfaces that are 
> still in the code and can only be removed once the move to the new Flink 1.2 
> checkpointing is completely in place.
> So far, these are {{StreamCheckpointedOperator}} and the legacy snapshot 
> methods on the testing harnesses. Furthermore, codepaths and classes touching 
> legacy state (e.g. {{StateAssignmentOperation}} or {{TaskStateHandles}} can 
> be cleaned up. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4858) Remove Legacy Checkpointing Interfaces

2016-12-12 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742647#comment-15742647
 ] 

Robert Metzger commented on FLINK-4858:
---

Okay, thank you for your input. I'lll remove 1.2 as a target fix version.

> Remove Legacy Checkpointing Interfaces
> --
>
> Key: FLINK-4858
> URL: https://issues.apache.org/jira/browse/FLINK-4858
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing, Streaming
>Affects Versions: 1.2.0
>Reporter: Aljoscha Krettek
>Priority: Blocker
>
> This issue tracks the removal of the deprecated/legacy interfaces that are 
> still in the code and can only be removed once the move to the new Flink 1.2 
> checkpointing is completely in place.
> So far, these are {{StreamCheckpointedOperator}} and the legacy snapshot 
> methods on the testing harnesses. Furthermore, codepaths and classes touching 
> legacy state (e.g. {{StateAssignmentOperation}} or {{TaskStateHandles}} can 
> be cleaned up. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5322) yarn.taskmanager.env value does not appear in System.getenv

2016-12-12 Thread Shannon Carey (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shannon Carey updated FLINK-5322:
-
Description: 
The value I specified in flink-conf.yaml

{code}
yarn.taskmanager.env:
  MY_ENV: test
{code}

is not available in {{System.getenv("MY_ENV")}} from the plan execution 
(execution flow of main method) nor from within execution of a streaming 
operator.

Interestingly, it does appear within the Flink JobManager Web UI under Job 
Manager -> Configuration.

  was:
The value I specified in flink-conf.yaml

{code}
yarn.taskmanager.env:
  MY_ENV: test
{code}

is not available in {{System.getenv("MY_ENV")}} from the plan execution 
(execution flow of main method) nor from within execution of a streaming 
operator.



> yarn.taskmanager.env value does not appear in System.getenv
> ---
>
> Key: FLINK-5322
> URL: https://issues.apache.org/jira/browse/FLINK-5322
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
> Environment: Flink 1.1.3 on AWS EMR emr-5.2.0 (Hadoop "Amazon 2.7.3")
>Reporter: Shannon Carey
>Priority: Trivial
> Fix For: 1.1.3
>
>
> The value I specified in flink-conf.yaml
> {code}
> yarn.taskmanager.env:
>   MY_ENV: test
> {code}
> is not available in {{System.getenv("MY_ENV")}} from the plan execution 
> (execution flow of main method) nor from within execution of a streaming 
> operator.
> Interestingly, it does appear within the Flink JobManager Web UI under Job 
> Manager -> Configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4858) Remove Legacy Checkpointing Interfaces

2016-12-12 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-4858:
--
Fix Version/s: (was: 1.2.0)

> Remove Legacy Checkpointing Interfaces
> --
>
> Key: FLINK-4858
> URL: https://issues.apache.org/jira/browse/FLINK-4858
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.2.0
>Reporter: Aljoscha Krettek
>Priority: Blocker
>
> This issue tracks the removal of the deprecated/legacy interfaces that are 
> still in the code and can only be removed once the move to the new Flink 1.2 
> checkpointing is completely in place.
> So far, these are {{StreamCheckpointedOperator}} and the legacy snapshot 
> methods on the testing harnesses. Furthermore, codepaths and classes touching 
> legacy state (e.g. {{StateAssignmentOperation}} or {{TaskStateHandles}} can 
> be cleaned up. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5322) yarn.taskmanager.env value does not appear in System.getenv

2016-12-12 Thread Shannon Carey (JIRA)
Shannon Carey created FLINK-5322:


 Summary: yarn.taskmanager.env value does not appear in 
System.getenv
 Key: FLINK-5322
 URL: https://issues.apache.org/jira/browse/FLINK-5322
 Project: Flink
  Issue Type: Bug
  Components: YARN
 Environment: Flink 1.1.3 on AWS EMR emr-5.2.0 (Hadoop "Amazon 2.7.3")
Reporter: Shannon Carey
Priority: Trivial
 Fix For: 1.1.3


The value I specified in flink-conf.yaml

{code}
yarn.taskmanager.env:
  MY_ENV: test
{code}

is not available in {{System.getenv("MY_ENV")}} from the plan execution 
(execution flow of main method) nor from within execution of a streaming 
operator.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5290) Ensure backwards compatibility of the hashes used to generate JobVertexIds

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742637#comment-15742637
 ] 

ASF GitHub Bot commented on FLINK-5290:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2966#discussion_r92004254
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphHasher.java
 ---
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.flink.util.StringUtils.byteToHexString;
+
+public class DefaultStreamGraphHasher implements StreamGraphHasher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(DefaultStreamGraphHasher.class);
+
+   /**
+* Returns a map with a hash for each {@link StreamNode} of the {@link
+* StreamGraph}. The hash is used as the {@link JobVertexID} in order to
+* identify nodes across job submissions if they didn't change.
+* 
+* The complete {@link StreamGraph} is traversed. The hash is either
+* computed from the transformation's user-specified id (see
+* {@link StreamTransformation#getUid()}) or generated in a 
deterministic way.
+* 
+* The generated hash is deterministic with respect to:
+* 
+* node-local properties (like parallelism, UDF, node ID),
+* chained output nodes, and
+* input nodes hashes
+* 
+*
+* @return A map from {@link StreamNode#id} to hash as 16-byte array.
+*/
+   @Override
+   public Map 
traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
+   // The hash function used to generate the hash
+   final HashFunction hashFunction = Hashing.murmur3_128(0);
+   final Map hashes = new HashMap<>();
+
+   Set visited = new HashSet<>();
+   Queue remaining = new ArrayDeque<>();
+
+   // We need to make the source order deterministic. The source 
IDs are
+   // not returned in the same order, which means that submitting 
the same
+   // program twice might result in different traversal, which 
breaks the
+   // deterministic hash assignment.
+   List sources = new ArrayList<>();
+   for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
+   sources.add(sourceNodeId);
+   }
+   Collections.sort(sources);
+
+   //
+   // Traverse the graph in a breadth-first manner. Keep in mind 
that
+   // the graph is not a tree and multiple paths to nodes can 
exist.
+   //
+
+   // Start with source nodes
+   for (Integer sourceNodeId : sources) {
+   remaining.add(streamGraph.getStreamNode(sourceNodeId));
+   visited.add(sourceNodeId);
+

[GitHub] flink pull request #2966: [FLINK-5290] Ensure backwards compatibility of the...

2016-12-12 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2966#discussion_r92004254
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphHasher.java
 ---
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.flink.util.StringUtils.byteToHexString;
+
+public class DefaultStreamGraphHasher implements StreamGraphHasher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(DefaultStreamGraphHasher.class);
+
+   /**
+* Returns a map with a hash for each {@link StreamNode} of the {@link
+* StreamGraph}. The hash is used as the {@link JobVertexID} in order to
+* identify nodes across job submissions if they didn't change.
+* 
+* The complete {@link StreamGraph} is traversed. The hash is either
+* computed from the transformation's user-specified id (see
+* {@link StreamTransformation#getUid()}) or generated in a 
deterministic way.
+* 
+* The generated hash is deterministic with respect to:
+* 
+* node-local properties (like parallelism, UDF, node ID),
+* chained output nodes, and
+* input nodes hashes
+* 
+*
+* @return A map from {@link StreamNode#id} to hash as 16-byte array.
+*/
+   @Override
+   public Map 
traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
+   // The hash function used to generate the hash
+   final HashFunction hashFunction = Hashing.murmur3_128(0);
+   final Map hashes = new HashMap<>();
+
+   Set visited = new HashSet<>();
+   Queue remaining = new ArrayDeque<>();
+
+   // We need to make the source order deterministic. The source 
IDs are
+   // not returned in the same order, which means that submitting 
the same
+   // program twice might result in different traversal, which 
breaks the
+   // deterministic hash assignment.
+   List sources = new ArrayList<>();
+   for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
+   sources.add(sourceNodeId);
+   }
+   Collections.sort(sources);
+
+   //
+   // Traverse the graph in a breadth-first manner. Keep in mind 
that
+   // the graph is not a tree and multiple paths to nodes can 
exist.
+   //
+
+   // Start with source nodes
+   for (Integer sourceNodeId : sources) {
+   remaining.add(streamGraph.getStreamNode(sourceNodeId));
+   visited.add(sourceNodeId);
+   }
+
+   StreamNode currentNode;
+   while ((currentNode = remaining.poll()) != null) {
+   // Generate the hash code. Because multiple path exist 
to each
+   // 

[jira] [Commented] (FLINK-5290) Ensure backwards compatibility of the hashes used to generate JobVertexIds

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742622#comment-15742622
 ] 

ASF GitHub Bot commented on FLINK-5290:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2966#discussion_r92003177
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphHasher.java
 ---
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.flink.util.StringUtils.byteToHexString;
+
+public class DefaultStreamGraphHasher implements StreamGraphHasher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(DefaultStreamGraphHasher.class);
+
+   /**
+* Returns a map with a hash for each {@link StreamNode} of the {@link
+* StreamGraph}. The hash is used as the {@link JobVertexID} in order to
+* identify nodes across job submissions if they didn't change.
+* 
+* The complete {@link StreamGraph} is traversed. The hash is either
+* computed from the transformation's user-specified id (see
+* {@link StreamTransformation#getUid()}) or generated in a 
deterministic way.
+* 
+* The generated hash is deterministic with respect to:
+* 
+* node-local properties (like parallelism, UDF, node ID),
+* chained output nodes, and
+* input nodes hashes
+* 
+*
+* @return A map from {@link StreamNode#id} to hash as 16-byte array.
+*/
+   @Override
+   public Map 
traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
+   // The hash function used to generate the hash
+   final HashFunction hashFunction = Hashing.murmur3_128(0);
+   final Map hashes = new HashMap<>();
+
+   Set visited = new HashSet<>();
+   Queue remaining = new ArrayDeque<>();
+
+   // We need to make the source order deterministic. The source 
IDs are
+   // not returned in the same order, which means that submitting 
the same
+   // program twice might result in different traversal, which 
breaks the
+   // deterministic hash assignment.
+   List sources = new ArrayList<>();
+   for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
+   sources.add(sourceNodeId);
+   }
+   Collections.sort(sources);
+
+   //
+   // Traverse the graph in a breadth-first manner. Keep in mind 
that
+   // the graph is not a tree and multiple paths to nodes can 
exist.
+   //
+
+   // Start with source nodes
+   for (Integer sourceNodeId : sources) {
+   remaining.add(streamGraph.getStreamNode(sourceNodeId));
+   visited.add(sourceNodeId);
+  

[jira] [Commented] (FLINK-4858) Remove Legacy Checkpointing Interfaces

2016-12-12 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742624#comment-15742624
 ] 

Stefan Richter commented on FLINK-4858:
---

Agreed. We can not remove it in 1.2, because that means breaking backwards 
compatibility to all UDFs that use {{Checkpointed}}.

> Remove Legacy Checkpointing Interfaces
> --
>
> Key: FLINK-4858
> URL: https://issues.apache.org/jira/browse/FLINK-4858
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.2.0
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.2.0
>
>
> This issue tracks the removal of the deprecated/legacy interfaces that are 
> still in the code and can only be removed once the move to the new Flink 1.2 
> checkpointing is completely in place.
> So far, these are {{StreamCheckpointedOperator}} and the legacy snapshot 
> methods on the testing harnesses. Furthermore, codepaths and classes touching 
> legacy state (e.g. {{StateAssignmentOperation}} or {{TaskStateHandles}} can 
> be cleaned up. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2966: [FLINK-5290] Ensure backwards compatibility of the...

2016-12-12 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2966#discussion_r92003177
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphHasher.java
 ---
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.flink.util.StringUtils.byteToHexString;
+
+public class DefaultStreamGraphHasher implements StreamGraphHasher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(DefaultStreamGraphHasher.class);
+
+   /**
+* Returns a map with a hash for each {@link StreamNode} of the {@link
+* StreamGraph}. The hash is used as the {@link JobVertexID} in order to
+* identify nodes across job submissions if they didn't change.
+* 
+* The complete {@link StreamGraph} is traversed. The hash is either
+* computed from the transformation's user-specified id (see
+* {@link StreamTransformation#getUid()}) or generated in a 
deterministic way.
+* 
+* The generated hash is deterministic with respect to:
+* 
+* node-local properties (like parallelism, UDF, node ID),
+* chained output nodes, and
+* input nodes hashes
+* 
+*
+* @return A map from {@link StreamNode#id} to hash as 16-byte array.
+*/
+   @Override
+   public Map 
traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
+   // The hash function used to generate the hash
+   final HashFunction hashFunction = Hashing.murmur3_128(0);
+   final Map hashes = new HashMap<>();
+
+   Set visited = new HashSet<>();
+   Queue remaining = new ArrayDeque<>();
+
+   // We need to make the source order deterministic. The source 
IDs are
+   // not returned in the same order, which means that submitting 
the same
+   // program twice might result in different traversal, which 
breaks the
+   // deterministic hash assignment.
+   List sources = new ArrayList<>();
+   for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
+   sources.add(sourceNodeId);
+   }
+   Collections.sort(sources);
+
+   //
+   // Traverse the graph in a breadth-first manner. Keep in mind 
that
+   // the graph is not a tree and multiple paths to nodes can 
exist.
+   //
+
+   // Start with source nodes
+   for (Integer sourceNodeId : sources) {
+   remaining.add(streamGraph.getStreamNode(sourceNodeId));
+   visited.add(sourceNodeId);
+   }
+
+   StreamNode currentNode;
+   while ((currentNode = remaining.poll()) != null) {
+   // Generate the hash code. Because multiple path exist 
to each
+   // node, 

[jira] [Commented] (FLINK-4858) Remove Legacy Checkpointing Interfaces

2016-12-12 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742614#comment-15742614
 ] 

Aljoscha Krettek commented on FLINK-4858:
-

We probably can't remove it. (unfortunately)

> Remove Legacy Checkpointing Interfaces
> --
>
> Key: FLINK-4858
> URL: https://issues.apache.org/jira/browse/FLINK-4858
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.2.0
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.2.0
>
>
> This issue tracks the removal of the deprecated/legacy interfaces that are 
> still in the code and can only be removed once the move to the new Flink 1.2 
> checkpointing is completely in place.
> So far, these are {{StreamCheckpointedOperator}} and the legacy snapshot 
> methods on the testing harnesses. Furthermore, codepaths and classes touching 
> legacy state (e.g. {{StateAssignmentOperation}} or {{TaskStateHandles}} can 
> be cleaned up. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5319) ClassCastException when reusing an inherited method reference as KeySelector for different classes

2016-12-12 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742609#comment-15742609
 ] 

Aljoscha Krettek commented on FLINK-5319:
-

[~twalthr] could you please have a look? This seems related to another case 
that we had a while back.

> ClassCastException when reusing an inherited method reference as KeySelector 
> for different classes
> --
>
> Key: FLINK-5319
> URL: https://issues.apache.org/jira/browse/FLINK-5319
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Alexander Chermenin
>
> Code sample:
> {code}static abstract class A {
> int id;
> A(int id) {this.id = id; }
> int getId() { return id; }
> }
> static class B extends A { B(int id) { super(id % 3); } }
> static class C extends A { C(int id) { super(id % 2); } }
> private static B b(int id) { return new B(id); }
> private static C c(int id) { return new C(id); }
> /**
>  * Main method.
>  */
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment environment =
> StreamExecutionEnvironment.getExecutionEnvironment();
> B[] bs = IntStream.range(0, 10).mapToObj(Test::b).toArray(B[]::new);
> C[] cs = IntStream.range(0, 10).mapToObj(Test::c).toArray(C[]::new);
> DataStreamSource bStream = environment.fromElements(bs);
> DataStreamSource cStream = environment.fromElements(cs);
> bStream.keyBy((KeySelector) A::getId).print();
> cStream.keyBy((KeySelector) A::getId).print();
> environment.execute();
> }
> {code}
> This code throws next exception:
> {code}Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:901)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: Could not extract key from 
> org.sample.flink.examples.Test$C@5e1a8111
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:75)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:746)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:724)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:84)
>   at 
> org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:127)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:75)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not extract key from 
> org.sample.flink.examples.Test$C@5e1a8111
>   at 
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)
>   at 
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
>   at 
> 

[jira] [Assigned] (FLINK-5245) Add support for BipartiteGraph mutations

2016-12-12 Thread Ivan Mushketyk (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Mushketyk reassigned FLINK-5245:
-

Assignee: Ivan Mushketyk

> Add support for BipartiteGraph mutations
> 
>
> Key: FLINK-5245
> URL: https://issues.apache.org/jira/browse/FLINK-5245
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>
> Implement methods for adding and removing vertices and edges similarly to 
> Graph class.
> Depends on https://issues.apache.org/jira/browse/FLINK-2254



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2984: [FLINK-5311] Add user documentation for bipartite graph

2016-12-12 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2984
  
Hi @vasia,

Thank you for your review. I've updated the documentation accordingly and 
added more detailed descriptions for projections code examples. Do you think it 
is still better to add some images?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3398) Flink Kafka consumer should support auto-commit opt-outs

2016-12-12 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742585#comment-15742585
 ] 

Aljoscha Krettek commented on FLINK-3398:
-

I just saw a lot of discussion about this on the ML so I bumped it. We can also 
reduce the importance again.

> Flink Kafka consumer should support auto-commit opt-outs
> 
>
> Key: FLINK-3398
> URL: https://issues.apache.org/jira/browse/FLINK-3398
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Shikhar Bhushan
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.2.0
>
>
> Currently the Kafka source will commit consumer offsets to Zookeeper, either 
> upon a checkpoint if checkpointing is enabled, otherwise periodically based 
> on {{auto.commit.interval.ms}}
> It should be possible to opt-out of committing consumer offsets to Zookeeper. 
> Kafka has this config as {{auto.commit.enable}} (0.8) and 
> {{enable.auto.commit}} (0.9).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5311) Write user documentation for BipartiteGraph

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742590#comment-15742590
 ] 

ASF GitHub Bot commented on FLINK-5311:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2984
  
Hi @vasia,

Thank you for your review. I've updated the documentation accordingly and 
added more detailed descriptions for projections code examples. Do you think it 
is still better to add some images?


> Write user documentation for BipartiteGraph
> ---
>
> Key: FLINK-5311
> URL: https://issues.apache.org/jira/browse/FLINK-5311
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>
> We need to add user documentation. The progress on BipartiteGraph can be 
> tracked in the following JIRA:
> https://issues.apache.org/jira/browse/FLINK-2254



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5295) Migrate the AlignedWindowOperators to the WindowOperator and make it backwards compatible.

2016-12-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-5295:

Priority: Blocker  (was: Major)

> Migrate the AlignedWindowOperators to the WindowOperator and make it 
> backwards compatible.
> --
>
> Key: FLINK-5295
> URL: https://issues.apache.org/jira/browse/FLINK-5295
> Project: Flink
>  Issue Type: Bug
>  Components: Windowing Operators
>Affects Versions: 1.2.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.2.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5318) Make the Rolling/Bucketing sink backwards compatible.

2016-12-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-5318:

Priority: Blocker  (was: Major)

> Make the Rolling/Bucketing sink backwards compatible.
> -
>
> Key: FLINK-5318
> URL: https://issues.apache.org/jira/browse/FLINK-5318
> Project: Flink
>  Issue Type: Sub-task
>  Components: filesystem-connector
>Affects Versions: 1.2.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.2.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5293) Make the Kafka consumer backwards compatible.

2016-12-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-5293:

Priority: Blocker  (was: Major)

> Make the Kafka consumer backwards compatible.
> -
>
> Key: FLINK-5293
> URL: https://issues.apache.org/jira/browse/FLINK-5293
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.2.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5294) Make the WindowOperator backwards compatible.

2016-12-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-5294:

Priority: Blocker  (was: Major)

> Make the WindowOperator backwards compatible.
> -
>
> Key: FLINK-5294
> URL: https://issues.apache.org/jira/browse/FLINK-5294
> Project: Flink
>  Issue Type: Bug
>  Components: Windowing Operators
>Affects Versions: 1.2.0
>Reporter: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.2.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5296) Expose the old AlignedWindowOperators to the user through explicit commands.

2016-12-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-5296:

Priority: Blocker  (was: Major)

> Expose the old AlignedWindowOperators to the user through explicit commands.
> 
>
> Key: FLINK-5296
> URL: https://issues.apache.org/jira/browse/FLINK-5296
> Project: Flink
>  Issue Type: Bug
>  Components: Windowing Operators
>Affects Versions: 1.2.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.2.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4973) Flakey Yarn tests due to recently added latency marker

2016-12-12 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-4973:


Assignee: Till Rohrmann  (was: Robert Metzger)

> Flakey Yarn tests due to recently added latency marker
> --
>
> Key: FLINK-4973
> URL: https://issues.apache.org/jira/browse/FLINK-4973
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.2.0
>
>
> The newly introduced {{LatencyMarksEmitter}} emits latency marker on the 
> {{Output}}. This can still happen after the underlying {{BufferPool}} has 
> been destroyed. The occurring exception is then logged:
> {code}
> 2016-10-29 15:00:48,088 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Source: Custom File Source (1/1) switched to FINISHED
> 2016-10-29 15:00:48,089 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Freeing task resources for Source: Custom File Source (1/1)
> 2016-10-29 15:00:48,089 INFO  org.apache.flink.yarn.YarnTaskManager   
>   - Un-registering task and sending final execution state 
> FINISHED to JobManager for task Source: Custom File Source 
> (8fe0f817fa6d960ea33f6e57e0c3891c)
> 2016-10-29 15:00:48,101 WARN  
> org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Error 
> while emitting latency marker
> java.lang.RuntimeException: Buffer pool is destroyed.
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:99)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:734)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.run(StreamSource.java:134)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
>   at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:144)
>   at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:118)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:103)
>   at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:104)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:96)
>   ... 9 more
> {code}
> This exception is clearly related to the shutdown of a stream operator and 
> does not indicate a wrong behaviour. Since the yarn tests simply scan the log 
> for some keywords (including exception) such a case can make them fail.
> Best if we could make sure that the {{LatencyMarksEmitter}} would only emit 
> latency marker if the {{Output}} would still be active. But we could also 
> simply not log exceptions which occurred after the stream operator has been 
> stopped.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/171578846/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4631) NullPointerException during stream task cleanup

2016-12-12 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742565#comment-15742565
 ] 

Aljoscha Krettek commented on FLINK-4631:
-

I think it's ok to close. The log entries are not pretty but it only happens in 
the case of failure.

> NullPointerException during stream task cleanup
> ---
>
> Key: FLINK-4631
> URL: https://issues.apache.org/jira/browse/FLINK-4631
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.2
> Environment: Ubuntu server 12.04.5 64 bit
> java version "1.8.0_40"
> Java(TM) SE Runtime Environment (build 1.8.0_40-b26)
> Java HotSpot(TM) 64-Bit Server VM (build 25.40-b25, mixed mode)
>Reporter: Avihai Berkovitz
> Fix For: 1.2.0
>
>
> If a streaming job failed during startup (in my case, due to lack of network 
> buffers), all the tasks are being cancelled before they started. This causes 
> many instances of the following exception:
> {noformat}
> 2016-09-18 14:17:12,177 ERROR 
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Error during 
> cleanup of stream task
> java.lang.NullPointerException
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.cleanup(OneInputStreamTask.java:73)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4814) Remove extra storage location for externalized checkpoint metadata

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742566#comment-15742566
 ] 

ASF GitHub Bot commented on FLINK-4814:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2752
  
I think this is a very nice addition, but probably needs a bit refinement:

  - I heard from various users that they would like to have a config option 
for the checkpoint directory, over a configuration "in code"

  - I would suggest to actually retain the `state.checkpoints.dir` config 
parameter, and make every state backend respect it for externalized checkpoints

  - State backends that write to a file system (FsStateBackend, RocksDB, 
...) would respect that parameter (we might even want to deprecate their 
specific checkpoint dir parameter)

  - State backends that do not write to files will be fine with this 
parameter being absent unless one chooses to enable externalized checkpoints

  - The "in code" options overrides the config option, if both are set.

  - The option to enable externalize checkpoints should be also present in 
the configuration.

What would be nice is to merge the documentation, as far as it is 
applicable to the current state.


> Remove extra storage location for externalized checkpoint metadata
> --
>
> Key: FLINK-4814
> URL: https://issues.apache.org/jira/browse/FLINK-4814
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Ufuk Celebi
>
> Follow up for FLINK-4512.
> Store checkpoint meta data in checkpoint directory.  That makes it simpler 
> for users to track and clean up checkpoints manually, if they want to retain 
> externalized checkpoints across cancellations and terminal failures.
> Every state backend needs to be able to provide a storage location for the 
> checkpoint metadata. The memory state backend would hence not work with 
> externalized checkpoints, unless one sets explicitly a parameter 
> `setExternalizedCheckpointsLocation(uri)`.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2752: [FLINK-4814] [checkpointing] Use checkpoint directory for...

2016-12-12 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2752
  
I think this is a very nice addition, but probably needs a bit refinement:

  - I heard from various users that they would like to have a config option 
for the checkpoint directory, over a configuration "in code"

  - I would suggest to actually retain the `state.checkpoints.dir` config 
parameter, and make every state backend respect it for externalized checkpoints

  - State backends that write to a file system (FsStateBackend, RocksDB, 
...) would respect that parameter (we might even want to deprecate their 
specific checkpoint dir parameter)

  - State backends that do not write to files will be fine with this 
parameter being absent unless one chooses to enable externalized checkpoints

  - The "in code" options overrides the config option, if both are set.

  - The option to enable externalize checkpoints should be also present in 
the configuration.

What would be nice is to merge the documentation, as far as it is 
applicable to the current state.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2756: [FLINK-4997] Extending Window Function Metadata

2016-12-12 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2756
  
@VenturaDelMonte, that could require yet more tests, yes. We'll see once 
I'm done with those for the regular windows.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4997) Extending Window Function Metadata

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742554#comment-15742554
 ] 

ASF GitHub Bot commented on FLINK-4997:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2756
  
@VenturaDelMonte, that could require yet more tests, yes. We'll see once 
I'm done with those for the regular windows.


> Extending Window Function Metadata
> --
>
> Key: FLINK-4997
> URL: https://issues.apache.org/jira/browse/FLINK-4997
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, Streaming, Windowing Operators
>Reporter: Ventura Del Monte
>Assignee: Ventura Del Monte
> Fix For: 1.2.0
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5290) Ensure backwards compatibility of the hashes used to generate JobVertexIds

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742541#comment-15742541
 ] 

ASF GitHub Bot commented on FLINK-5290:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2966#discussion_r91997425
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphHasher.java
 ---
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.flink.util.StringUtils.byteToHexString;
+
+public class DefaultStreamGraphHasher implements StreamGraphHasher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(DefaultStreamGraphHasher.class);
+
+   /**
+* Returns a map with a hash for each {@link StreamNode} of the {@link
+* StreamGraph}. The hash is used as the {@link JobVertexID} in order to
+* identify nodes across job submissions if they didn't change.
+* 
+* The complete {@link StreamGraph} is traversed. The hash is either
+* computed from the transformation's user-specified id (see
+* {@link StreamTransformation#getUid()}) or generated in a 
deterministic way.
+* 
+* The generated hash is deterministic with respect to:
+* 
+* node-local properties (like parallelism, UDF, node ID),
+* chained output nodes, and
+* input nodes hashes
+* 
+*
+* @return A map from {@link StreamNode#id} to hash as 16-byte array.
+*/
+   @Override
+   public Map 
traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
+   // The hash function used to generate the hash
+   final HashFunction hashFunction = Hashing.murmur3_128(0);
+   final Map hashes = new HashMap<>();
+
+   Set visited = new HashSet<>();
+   Queue remaining = new ArrayDeque<>();
+
+   // We need to make the source order deterministic. The source 
IDs are
+   // not returned in the same order, which means that submitting 
the same
+   // program twice might result in different traversal, which 
breaks the
+   // deterministic hash assignment.
+   List sources = new ArrayList<>();
+   for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
+   sources.add(sourceNodeId);
+   }
+   Collections.sort(sources);
+
+   //
+   // Traverse the graph in a breadth-first manner. Keep in mind 
that
+   // the graph is not a tree and multiple paths to nodes can 
exist.
+   //
+
+   // Start with source nodes
+   for (Integer sourceNodeId : sources) {
+   remaining.add(streamGraph.getStreamNode(sourceNodeId));
+   visited.add(sourceNodeId);
+

[GitHub] flink pull request #2966: [FLINK-5290] Ensure backwards compatibility of the...

2016-12-12 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2966#discussion_r91997425
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphHasher.java
 ---
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.flink.util.StringUtils.byteToHexString;
+
+public class DefaultStreamGraphHasher implements StreamGraphHasher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(DefaultStreamGraphHasher.class);
+
+   /**
+* Returns a map with a hash for each {@link StreamNode} of the {@link
+* StreamGraph}. The hash is used as the {@link JobVertexID} in order to
+* identify nodes across job submissions if they didn't change.
+* 
+* The complete {@link StreamGraph} is traversed. The hash is either
+* computed from the transformation's user-specified id (see
+* {@link StreamTransformation#getUid()}) or generated in a 
deterministic way.
+* 
+* The generated hash is deterministic with respect to:
+* 
+* node-local properties (like parallelism, UDF, node ID),
+* chained output nodes, and
+* input nodes hashes
+* 
+*
+* @return A map from {@link StreamNode#id} to hash as 16-byte array.
+*/
+   @Override
+   public Map 
traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
+   // The hash function used to generate the hash
+   final HashFunction hashFunction = Hashing.murmur3_128(0);
+   final Map hashes = new HashMap<>();
+
+   Set visited = new HashSet<>();
+   Queue remaining = new ArrayDeque<>();
+
+   // We need to make the source order deterministic. The source 
IDs are
+   // not returned in the same order, which means that submitting 
the same
+   // program twice might result in different traversal, which 
breaks the
+   // deterministic hash assignment.
+   List sources = new ArrayList<>();
+   for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
+   sources.add(sourceNodeId);
+   }
+   Collections.sort(sources);
+
+   //
+   // Traverse the graph in a breadth-first manner. Keep in mind 
that
+   // the graph is not a tree and multiple paths to nodes can 
exist.
+   //
+
+   // Start with source nodes
+   for (Integer sourceNodeId : sources) {
+   remaining.add(streamGraph.getStreamNode(sourceNodeId));
+   visited.add(sourceNodeId);
+   }
+
+   StreamNode currentNode;
+   while ((currentNode = remaining.poll()) != null) {
+   // Generate the hash code. Because multiple path exist 
to each
+   // 

[jira] [Commented] (FLINK-4858) Remove Legacy Checkpointing Interfaces

2016-12-12 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742521#comment-15742521
 ] 

Robert Metzger commented on FLINK-4858:
---

Are we going to remove the Checkpointing interface before the 1.2 release? I 
think we should maybe deprecate it in 1.2 and remove it in 1.3 ?
(I'm basically wondering whether we can remove "fix-for-version 1.2")

> Remove Legacy Checkpointing Interfaces
> --
>
> Key: FLINK-4858
> URL: https://issues.apache.org/jira/browse/FLINK-4858
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.2.0
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.2.0
>
>
> This issue tracks the removal of the deprecated/legacy interfaces that are 
> still in the code and can only be removed once the move to the new Flink 1.2 
> checkpointing is completely in place.
> So far, these are {{StreamCheckpointedOperator}} and the legacy snapshot 
> methods on the testing harnesses. Furthermore, codepaths and classes touching 
> legacy state (e.g. {{StateAssignmentOperation}} or {{TaskStateHandles}} can 
> be cleaned up. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2928: [FLINK-5108] Remove ClientShutdownHook during job executi...

2016-12-12 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2928
  
I will go ahead and merge this PR since there have been no further comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5108) Remove ClientShutdownHook during job execution

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742520#comment-15742520
 ] 

ASF GitHub Bot commented on FLINK-5108:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2928
  
I will go ahead and merge this PR since there have been no further comments.


> Remove ClientShutdownHook during job execution
> --
>
> Key: FLINK-5108
> URL: https://issues.apache.org/jira/browse/FLINK-5108
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Maximilian Michels
>Assignee: Renkai Ge
>Priority: Blocker
> Fix For: 1.2.0
>
>
> The behavior of the Standalone mode is to not react to client interrupts once 
> a job has been deployed. We should change the Yarn client implementation to 
> behave the same. This avoids accidental shutdown of the job, e.g. when the 
> user sends an interrupt via CTRL-C or when the client machine shuts down.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2984: [FLINK-5311] Add user documentation for bipartite ...

2016-12-12 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2984#discussion_r91995428
  
--- Diff: docs/dev/libs/gelly/bipartite_graph.md ---
@@ -0,0 +1,148 @@
+---
+title: Graph Generators
--- End diff --

Oops, right :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5311) Write user documentation for BipartiteGraph

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742515#comment-15742515
 ] 

ASF GitHub Bot commented on FLINK-5311:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2984#discussion_r91995428
  
--- Diff: docs/dev/libs/gelly/bipartite_graph.md ---
@@ -0,0 +1,148 @@
+---
+title: Graph Generators
--- End diff --

Oops, right :)


> Write user documentation for BipartiteGraph
> ---
>
> Key: FLINK-5311
> URL: https://issues.apache.org/jira/browse/FLINK-5311
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>
> We need to add user documentation. The progress on BipartiteGraph can be 
> tracked in the following JIRA:
> https://issues.apache.org/jira/browse/FLINK-2254



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-12 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742507#comment-15742507
 ] 

Ivan Mushketyk commented on FLINK-5280:
---

Hi [~fhueske] ,

Thank you for your comments. It's a much clearer now, but it seems that I am 
either still missing something obvious or it seems to me that the task is more 
involved than it was described.

Let me first describe how I understand this issue so that you could correct me.

So the goal of this task is to support nested data structures. So it means that 
if we have a type definition like this:

{code:java}
class ParentPojo {
  ChildPojo child;
  int num;
}

class ChildPojo {
  String str;
}
{code}

and we have a *TableSource* that returns a dataset of *ParentPojo* we can 
access nested fields in SQL queries. Something like:

{code:sql}
SELECT * FROM pojos WHERE child.str LIKE '%Rubber%'
{code}

In this case *child.str* is a way to access a nested field.

The first thing that confuses me is that current [SQL 
grammar|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/table_api.html#sql-syntax]
 does not seem to support any nested fields access, but I think may be a 
relatively minor nuisance.

If I understand it correctly internally *flink-table* converts any input into a 
dataset of Rows and then performs operations on it. To convert a nested 
*ParentPojo* into a flat schema we can extract all leaf values into two columns:

{code}
child.str num
{code}

similarly to how *Parquet* identifies columns in nested types (see the 
following 
[slide|http://www.slideshare.net/julienledem/strata-london-2016-the-future-of-column-oriented-data-processing-with-arrow-and-parquet/10?src=clipshare])

Now, where this becomes more interesting. If I understand it correctly 
*BatchScan#convertToExpectedType* is used to convert an input dataset into a 
dataset of *Row*s. For this task it generates a mapper function in 
*FlinkRel#getConversionMapper* which than calls 
*CodeGenerator#generateConverterResultExpression*.

So in our case it should generate code similar to something like:

{code:java}
public Row map(ParentPojo parent) {
Row row = new Row(2);
row.setField(0, parent.child.str);
row.setField(1, parent.num);

return row;
}
{code}

*CodeGenerator* accepts *fieldNames* and optional POJO field mapping to 
generate accessors. It seems that the main work is performed in 
*CodeGenerator#generateFieldAccess* that generates an access code for different 
fields of the POJO, but it does not create any code that accesses nested 
fields. It just generates an access code to a POJO field with a corresponding 
field name in CodeGenerator#generateFieldAccess.

Therefore, if I understand this correctly, we need to start with updating 
*CodeGenerator* to generate nested accessors and then we can extend 
*TableSource* to support nested data.

Am I overthink this issue? Or am I missing something obvious?




> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5319) ClassCastException when reusing an inherited method reference as KeySelector for different classes

2016-12-12 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742502#comment-15742502
 ] 

Chesnay Schepler commented on FLINK-5319:
-

Does this also happen if you remove all code related to C?

> ClassCastException when reusing an inherited method reference as KeySelector 
> for different classes
> --
>
> Key: FLINK-5319
> URL: https://issues.apache.org/jira/browse/FLINK-5319
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Alexander Chermenin
>
> Code sample:
> {code}static abstract class A {
> int id;
> A(int id) {this.id = id; }
> int getId() { return id; }
> }
> static class B extends A { B(int id) { super(id % 3); } }
> static class C extends A { C(int id) { super(id % 2); } }
> private static B b(int id) { return new B(id); }
> private static C c(int id) { return new C(id); }
> /**
>  * Main method.
>  */
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment environment =
> StreamExecutionEnvironment.getExecutionEnvironment();
> B[] bs = IntStream.range(0, 10).mapToObj(Test::b).toArray(B[]::new);
> C[] cs = IntStream.range(0, 10).mapToObj(Test::c).toArray(C[]::new);
> DataStreamSource bStream = environment.fromElements(bs);
> DataStreamSource cStream = environment.fromElements(cs);
> bStream.keyBy((KeySelector) A::getId).print();
> cStream.keyBy((KeySelector) A::getId).print();
> environment.execute();
> }
> {code}
> This code throws next exception:
> {code}Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:901)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: Could not extract key from 
> org.sample.flink.examples.Test$C@5e1a8111
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:75)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:746)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:724)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:84)
>   at 
> org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:127)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:75)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not extract key from 
> org.sample.flink.examples.Test$C@5e1a8111
>   at 
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)
>   at 
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
>   at 
> 

[jira] [Commented] (FLINK-5321) FlinkMiniCluster does not start Jobmanager MetricQueryService

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742500#comment-15742500
 ] 

ASF GitHub Bot commented on FLINK-5321:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/2991

[FLINK-5321] [metrics] LocalFlinkMiniCluster starts JM MetricQueryService



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 5321_mqs_fmc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2991.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2991


commit e943a2631c2750af9b1798c6a32929c3e3dac610
Author: zentol 
Date:   2016-12-12T17:15:14Z

[FLINK-5321] [metrics] LocalFlinkMiniCluster starts JM MetricQS




> FlinkMiniCluster does not start Jobmanager MetricQueryService
> -
>
> Key: FLINK-5321
> URL: https://issues.apache.org/jira/browse/FLINK-5321
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, Metrics
>Affects Versions: 1.2.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.2.0
>
>
> The JobManager MetricQueryService is never started when using the 
> LocalFlinkMiniCluster. It lacks the call to 
> MetricRegistry#startQueryService().
> As a result jobmanager metrics aren't reporter to the web frontend, and it 
> causes repeated logging of exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2991: [FLINK-5321] [metrics] LocalFlinkMiniCluster start...

2016-12-12 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/2991

[FLINK-5321] [metrics] LocalFlinkMiniCluster starts JM MetricQueryService



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 5321_mqs_fmc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2991.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2991


commit e943a2631c2750af9b1798c6a32929c3e3dac610
Author: zentol 
Date:   2016-12-12T17:15:14Z

[FLINK-5321] [metrics] LocalFlinkMiniCluster starts JM MetricQS




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742498#comment-15742498
 ] 

ASF GitHub Bot commented on FLINK-5113:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2939
  
@zentol Thanks for the review. I integrated your comments.


> Make all Testing Functions implement CheckpointedFunction Interface.
> 
>
> Key: FLINK-5113
> URL: https://issues.apache.org/jira/browse/FLINK-5113
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Currently stateful functions implement the (old) Checkpointed interface.
> This is issue aims at porting all these function to the new 
> CheckpointedFunction interface, so that they can leverage the new 
> capabilities by it. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2939: [FLINK-5113] Ports all functions in the tests to the new ...

2016-12-12 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2939
  
@zentol Thanks for the review. I integrated your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2939: [FLINK-5113] Ports all functions in the tests to the new ...

2016-12-12 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2939
  
The test should cover if something semantically wrong happened during 
restoring. For example, if you expected some state that never came, then the 
test should fail. This is not a matter of the interface and thus not part of 
this PR I think. 

What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742489#comment-15742489
 ] 

ASF GitHub Bot commented on FLINK-5113:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2939
  
The test should cover if something semantically wrong happened during 
restoring. For example, if you expected some state that never came, then the 
test should fail. This is not a matter of the interface and thus not part of 
this PR I think. 

What do you think?


> Make all Testing Functions implement CheckpointedFunction Interface.
> 
>
> Key: FLINK-5113
> URL: https://issues.apache.org/jira/browse/FLINK-5113
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Currently stateful functions implement the (old) Checkpointed interface.
> This is issue aims at porting all these function to the new 
> CheckpointedFunction interface, so that they can leverage the new 
> capabilities by it. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5321) FlinkMiniCluster does not start Jobmanager MetricQueryService

2016-12-12 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-5321:
---

 Summary: FlinkMiniCluster does not start Jobmanager 
MetricQueryService
 Key: FLINK-5321
 URL: https://issues.apache.org/jira/browse/FLINK-5321
 Project: Flink
  Issue Type: Bug
  Components: JobManager, Metrics
Affects Versions: 1.2.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.2.0


The JobManager MetricQueryService is never started when using the 
LocalFlinkMiniCluster. It lacks the call to MetricRegistry#startQueryService().

As a result jobmanager metrics aren't reporter to the web frontend, and it 
causes repeated logging of exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2016-12-12 Thread static-max
Github user static-max commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r91991599
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -227,6 +254,21 @@ public void afterBulk(long executionId, BulkRequest 
request, Throwable failure)
requestIndexer = new BulkProcessorIndexer(bulkProcessor);
}
 
+   /**
+* Adds all requests of the bulk to the BulkProcessor. Used when trying 
again.
+* @param bulkRequest
+*/
+   public void reAddBulkRequest(BulkRequest bulkRequest) {
+   //TODO Check what happens when bulk contains a DeleteAction and 
IndexActions and the DeleteAction fails because the document already has been 
deleted. This may not happen in typical Flink jobs.
--- End diff --

Currently I'm not aware of a way to filter these requests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5122) Elasticsearch Sink loses documents when cluster has high load

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742469#comment-15742469
 ] 

ASF GitHub Bot commented on FLINK-5122:
---

Github user static-max commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r91991599
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -227,6 +254,21 @@ public void afterBulk(long executionId, BulkRequest 
request, Throwable failure)
requestIndexer = new BulkProcessorIndexer(bulkProcessor);
}
 
+   /**
+* Adds all requests of the bulk to the BulkProcessor. Used when trying 
again.
+* @param bulkRequest
+*/
+   public void reAddBulkRequest(BulkRequest bulkRequest) {
+   //TODO Check what happens when bulk contains a DeleteAction and 
IndexActions and the DeleteAction fails because the document already has been 
deleted. This may not happen in typical Flink jobs.
--- End diff --

Currently I'm not aware of a way to filter these requests.


> Elasticsearch Sink loses documents when cluster has high load
> -
>
> Key: FLINK-5122
> URL: https://issues.apache.org/jira/browse/FLINK-5122
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.2.0
>Reporter: static-max
>Assignee: static-max
>
> My cluster had high load and documents got not indexed. This violates the "at 
> least once" semantics in the ES connector.
> I gave pressure on my cluster to test Flink, causing new indices to be 
> created and balanced. On those errors the bulk should be tried again instead 
> of being discarded.
> Primary shard not active because ES decided to rebalance the index:
> 2016-11-15 15:35:16,123 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
> Failed to index document in Elasticsearch: 
> UnavailableShardsException[[index-name][3] primary shard is not active 
> Timeout: [1m], request: [BulkShardRequest to [index-name] containing [20] 
> requests]]
> Bulk queue on node full (I set queue to a low value to reproduce error):
> 22:37:57,702 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
> Failed to index document in Elasticsearch: 
> RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]];
>  nested: EsRejectedExecutionException[rejected execution of 
> org.elasticsearch.transport.TransportService$4@727e677c on 
> EsThreadPoolExecutor[bulk, queue capacity = 1, 
> org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running,
>  pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 
> 2939]]];
> I can try to propose a PR for this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5320) Fix result TypeInformation in WindowedStream.fold(ACC, FoldFunction, WindowFunction)

2016-12-12 Thread Yassine Marzougui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yassine Marzougui updated FLINK-5320:
-
Affects Version/s: 1.2.0

> Fix result TypeInformation in WindowedStream.fold(ACC, FoldFunction, 
> WindowFunction)
> 
>
> Key: FLINK-5320
> URL: https://issues.apache.org/jira/browse/FLINK-5320
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.2.0
>Reporter: Yassine Marzougui
>Assignee: Yassine Marzougui
>
> The WindowedStream.fold(ACC, FoldFunction, WindowFunction) does not correctly 
> infer the resultType.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   3   >