[jira] [Commented] (FLINK-3051) Define a maximum number of concurrent inflight checkpoints

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028376#comment-15028376
 ] 

ASF GitHub Bot commented on FLINK-3051:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1408#issuecomment-159851509
  
Good addition! More control over our checkpointing is something people were 
asking me at talks.

Could you add a few sentences to the documentation about this?


> Define a maximum number of concurrent inflight checkpoints
> --
>
> Key: FLINK-3051
> URL: https://issues.apache.org/jira/browse/FLINK-3051
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.0.0
>
>
> The checkpoint coordinator should define an option to limit the maximum 
> number of current inflight checkpoints, as well as the checkpoint timeouts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2488] Expose Attempt Number in RuntimeC...

2015-11-26 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1386#issuecomment-159853758
  
I'll wait for Stephan to review this then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3051] Control the maximum number of con...

2015-11-26 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1408#issuecomment-159866582
  
Where is the docs would be the best place for that?
  - A new entry in the "Programing Guides" menu?
  - Or a section in the streaming guide?

I would vote for the first


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3051) Define a maximum number of concurrent inflight checkpoints

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028464#comment-15028464
 ] 

ASF GitHub Bot commented on FLINK-3051:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1408#issuecomment-159866582
  
Where is the docs would be the best place for that?
  - A new entry in the "Programing Guides" menu?
  - Or a section in the streaming guide?

I would vote for the first


> Define a maximum number of concurrent inflight checkpoints
> --
>
> Key: FLINK-3051
> URL: https://issues.apache.org/jira/browse/FLINK-3051
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.0.0
>
>
> The checkpoint coordinator should define an option to limit the maximum 
> number of current inflight checkpoints, as well as the checkpoint timeouts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2960) Communicate with JobManager through YARN proxy

2015-11-26 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028477#comment-15028477
 ] 

Robert Metzger commented on FLINK-2960:
---

I'm sorry, it won't happen again.

> Communicate with JobManager through YARN proxy
> --
>
> Key: FLINK-2960
> URL: https://issues.apache.org/jira/browse/FLINK-2960
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Affects Versions: 0.10.0
>Reporter: Maximilian Michels
>
> In a secured environment, ports on the ApplicationMaster may be blocked. 
> Thus, runtime messages have to pass through YARN interfaces and then be 
> forwarded to the ApplicationMaster/JobManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3051] Control the maximum number of con...

2015-11-26 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1408#issuecomment-159868163
  
I think it fits best into the "Fault Tolerance" section of the Streaming 
Guide: 
https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#fault-tolerance

If you think that section will grow to big, we could move it to a new page 
and link it from the streaming guide.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3046) Integrate the Either Java type with the TypeExtractor

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028486#comment-15028486
 ] 

ASF GitHub Bot commented on FLINK-3046:
---

Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1393#issuecomment-159869432
  
Maybe I am not aware of the limitations but things like this dont seem to 
work:

TypeExtractor.getForObject(Either. left(""));

source.map(new MapFunction>() {
@Override
public Either map(Long value) throws 
Exception {
return null;
}
});


> Integrate the Either Java type with the TypeExtractor
> -
>
> Key: FLINK-3046
> URL: https://issues.apache.org/jira/browse/FLINK-3046
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Affects Versions: 1.0.0
>Reporter: Vasia Kalavri
>Assignee: Timo Walther
>
> Integrate the Either Java type with the TypeExtractor, so that the APIs 
> recognize the type and choose the type info properly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3046] Integrate the Either Java type wi...

2015-11-26 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1393#issuecomment-159869432
  
Maybe I am not aware of the limitations but things like this dont seem to 
work:

TypeExtractor.getForObject(Either. left(""));

source.map(new MapFunction>() {
@Override
public Either map(Long value) throws 
Exception {
return null;
}
});


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3046) Integrate the Either Java type with the TypeExtractor

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028504#comment-15028504
 ] 

ASF GitHub Bot commented on FLINK-3046:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1393#issuecomment-159871232
  
The second variant should work (in the MapFunction signature) - if not, 
that is a bug.

The first variant cannot work because the type for "right" is nowhere in a 
non-erased form.
As a replacement for TypeExtractor.getForObject() I suggested that: 
https://issues.apache.org/jira/browse/FLINK-2788


> Integrate the Either Java type with the TypeExtractor
> -
>
> Key: FLINK-3046
> URL: https://issues.apache.org/jira/browse/FLINK-3046
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Affects Versions: 1.0.0
>Reporter: Vasia Kalavri
>Assignee: Timo Walther
>
> Integrate the Either Java type with the TypeExtractor, so that the APIs 
> recognize the type and choose the type info properly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45963943
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
 ---
@@ -33,7 +30,8 @@
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
-import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.HashMap;
 
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3046] Integrate the Either Java type wi...

2015-11-26 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1393#issuecomment-159871753
  
it gives the error : Usage of class Either as a type is not allowed. Use a 
concrete subclass instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45963933
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java
 ---
@@ -44,16 +45,30 @@
/** The schema (ie, ordered field names) of the tuple */
private final Fields schema;
 
+   private final int taskId;
+   private final String producerStreamId;
+   private final MessageId id;
+   private final String producerComponentId;
+
+
+   /**
+* Constructor which sets defaults for producerComponentId, taskId, and 
componentID
+* @param flinkTuple the Flink tuple
+* @param schema The schema of the storm fields
+*/
+   StormTuple(final IN flinkTuple, final Fields schema) {
+   this(flinkTuple, schema, -1, "testStream", "componentID");
+   }
--- End diff --

The use of null is often problematic. I prefer default values.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028505#comment-15028505
 ] 

ASF GitHub Bot commented on FLINK-2837:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45963921
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
 ---
@@ -27,13 +27,12 @@
 import backtype.storm.state.ISubscribedState;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Fields;
+import clojure.lang.Atom;
 
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


> FlinkTopologyBuilder cannot handle multiple input streams
> -
>
> Key: FLINK-2837
> URL: https://issues.apache.org/jira/browse/FLINK-2837
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Reporter: Matthias J. Sax
>Assignee: Maximilian Michels
>
> FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead 
> of union the incoming streams, it replicates the consuming bolt and each 
> (logical) instance processes one of the input streams.
> For example:
> {noformat}
> final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
> builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10));
> builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8));
> builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13));
> builder.setBolt(boltId, new MergerBolt())
>   .shuffleGrouping(spoutId1)
>   .shuffleGrouping(spoutId2)
>   .shuffleGrouping(spoutId3);
> builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter()))
>   .shuffleGrouping(boltId);
> {noformat}
> will only print the data from a single source instead of all sources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028508#comment-15028508
 ] 

ASF GitHub Bot commented on FLINK-2837:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45963943
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
 ---
@@ -33,7 +30,8 @@
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
-import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.HashMap;
 
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


> FlinkTopologyBuilder cannot handle multiple input streams
> -
>
> Key: FLINK-2837
> URL: https://issues.apache.org/jira/browse/FLINK-2837
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Reporter: Matthias J. Sax
>Assignee: Maximilian Michels
>
> FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead 
> of union the incoming streams, it replicates the consuming bolt and each 
> (logical) instance processes one of the input streams.
> For example:
> {noformat}
> final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
> builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10));
> builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8));
> builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13));
> builder.setBolt(boltId, new MergerBolt())
>   .shuffleGrouping(spoutId1)
>   .shuffleGrouping(spoutId2)
>   .shuffleGrouping(spoutId3);
> builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter()))
>   .shuffleGrouping(boltId);
> {noformat}
> will only print the data from a single source instead of all sources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3055) ExecutionVertex has duplicate method getParallelSubtaskIndex and getSubTaskIndex

2015-11-26 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028509#comment-15028509
 ] 

Chesnay Schepler commented on FLINK-3055:
-

if i saw it correctly [~rmetzger] added the getSubTaskIndex() method, at a time 
when getParallelSubtaskIndex() already existed. Maybe he can chime in why he 
added the new method? (although it's may be quite a bit in the past)

> ExecutionVertex has duplicate method getParallelSubtaskIndex and 
> getSubTaskIndex
> 
>
> Key: FLINK-3055
> URL: https://issues.apache.org/jira/browse/FLINK-3055
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime
>Affects Versions: 0.10.0
>Reporter: Ufuk Celebi
>Assignee: jun aoki
>Priority: Trivial
>
> In {{ExecutionVertex}}:
> {code}
> public int getSubTaskIndex() {
>   return subTaskIndex;
> }
> public int getParallelSubtaskIndex() {
>   return this.subTaskIndex;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964023
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java
 ---
@@ -17,12 +17,12 @@
 
 package org.apache.flink.storm.wrappers;
 
-import java.util.HashMap;
-
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3046) Integrate the Either Java type with the TypeExtractor

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028511#comment-15028511
 ] 

ASF GitHub Bot commented on FLINK-3046:
---

Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1393#issuecomment-159871753
  
it gives the error : Usage of class Either as a type is not allowed. Use a 
concrete subclass instead.


> Integrate the Either Java type with the TypeExtractor
> -
>
> Key: FLINK-3046
> URL: https://issues.apache.org/jira/browse/FLINK-3046
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Affects Versions: 1.0.0
>Reporter: Vasia Kalavri
>Assignee: Timo Walther
>
> Integrate the Either Java type with the TypeExtractor, so that the APIs 
> recognize the type and choose the type info properly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028507#comment-15028507
 ] 

ASF GitHub Bot commented on FLINK-2837:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45963933
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java
 ---
@@ -44,16 +45,30 @@
/** The schema (ie, ordered field names) of the tuple */
private final Fields schema;
 
+   private final int taskId;
+   private final String producerStreamId;
+   private final MessageId id;
+   private final String producerComponentId;
+
+
+   /**
+* Constructor which sets defaults for producerComponentId, taskId, and 
componentID
+* @param flinkTuple the Flink tuple
+* @param schema The schema of the storm fields
+*/
+   StormTuple(final IN flinkTuple, final Fields schema) {
+   this(flinkTuple, schema, -1, "testStream", "componentID");
+   }
--- End diff --

The use of null is often problematic. I prefer default values.


> FlinkTopologyBuilder cannot handle multiple input streams
> -
>
> Key: FLINK-2837
> URL: https://issues.apache.org/jira/browse/FLINK-2837
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Reporter: Matthias J. Sax
>Assignee: Maximilian Michels
>
> FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead 
> of union the incoming streams, it replicates the consuming bolt and each 
> (logical) instance processes one of the input streams.
> For example:
> {noformat}
> final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
> builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10));
> builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8));
> builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13));
> builder.setBolt(boltId, new MergerBolt())
>   .shuffleGrouping(spoutId1)
>   .shuffleGrouping(spoutId2)
>   .shuffleGrouping(spoutId3);
> builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter()))
>   .shuffleGrouping(boltId);
> {noformat}
> will only print the data from a single source instead of all sources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3051] Control the maximum number of con...

2015-11-26 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1408#issuecomment-159872110
  
Should we pull that into a separate document? It becomes quite large...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028513#comment-15028513
 ] 

ASF GitHub Bot commented on FLINK-2837:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964023
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java
 ---
@@ -17,12 +17,12 @@
 
 package org.apache.flink.storm.wrappers;
 
-import java.util.HashMap;
-
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


> FlinkTopologyBuilder cannot handle multiple input streams
> -
>
> Key: FLINK-2837
> URL: https://issues.apache.org/jira/browse/FLINK-2837
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Reporter: Matthias J. Sax
>Assignee: Maximilian Michels
>
> FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead 
> of union the incoming streams, it replicates the consuming bolt and each 
> (logical) instance processes one of the input streams.
> For example:
> {noformat}
> final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
> builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10));
> builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8));
> builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13));
> builder.setBolt(boltId, new MergerBolt())
>   .shuffleGrouping(spoutId1)
>   .shuffleGrouping(spoutId2)
>   .shuffleGrouping(spoutId3);
> builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter()))
>   .shuffleGrouping(boltId);
> {noformat}
> will only print the data from a single source instead of all sources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3051) Define a maximum number of concurrent inflight checkpoints

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028514#comment-15028514
 ] 

ASF GitHub Bot commented on FLINK-3051:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1408#issuecomment-159872110
  
Should we pull that into a separate document? It becomes quite large...


> Define a maximum number of concurrent inflight checkpoints
> --
>
> Key: FLINK-3051
> URL: https://issues.apache.org/jira/browse/FLINK-3051
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.0.0
>
>
> The checkpoint coordinator should define an option to limit the maximum 
> number of current inflight checkpoints, as well as the checkpoint timeouts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964241
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
 ---
@@ -18,9 +18,7 @@
 
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
-
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.storm.api.FlinkOutputFieldsDeclarer;
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3051] Control the maximum number of con...

2015-11-26 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1408#issuecomment-159872462
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028516#comment-15028516
 ] 

ASF GitHub Bot commented on FLINK-2837:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964241
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
 ---
@@ -18,9 +18,7 @@
 
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
-
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.storm.api.FlinkOutputFieldsDeclarer;
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


> FlinkTopologyBuilder cannot handle multiple input streams
> -
>
> Key: FLINK-2837
> URL: https://issues.apache.org/jira/browse/FLINK-2837
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Reporter: Matthias J. Sax
>Assignee: Maximilian Michels
>
> FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead 
> of union the incoming streams, it replicates the consuming bolt and each 
> (logical) instance processes one of the input streams.
> For example:
> {noformat}
> final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
> builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10));
> builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8));
> builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13));
> builder.setBolt(boltId, new MergerBolt())
>   .shuffleGrouping(spoutId1)
>   .shuffleGrouping(spoutId2)
>   .shuffleGrouping(spoutId3);
> builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter()))
>   .shuffleGrouping(boltId);
> {noformat}
> will only print the data from a single source instead of all sources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3051) Define a maximum number of concurrent inflight checkpoints

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028518#comment-15028518
 ] 

ASF GitHub Bot commented on FLINK-3051:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1408#issuecomment-159872462
  
+1


> Define a maximum number of concurrent inflight checkpoints
> --
>
> Key: FLINK-3051
> URL: https://issues.apache.org/jira/browse/FLINK-3051
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.0.0
>
>
> The checkpoint coordinator should define an option to limit the maximum 
> number of current inflight checkpoints, as well as the checkpoint timeouts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028523#comment-15028523
 ] 

ASF GitHub Bot commented on FLINK-2837:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964610
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java
 ---
@@ -14,50 +14,70 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.storm.api;
 
-import org.apache.flink.storm.api.FlinkTopology;
-import org.junit.Assert;
+
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import org.apache.flink.storm.util.TestDummyBolt;
+import org.apache.flink.storm.util.TestDummySpout;
+import org.apache.flink.storm.util.TestSink;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class FlinkTopologyTest {
 
-   @Test
-   public void testDefaultParallelism() {
-   final FlinkTopology topology = new FlinkTopology();
-   Assert.assertEquals(1, topology.getParallelism());
+   @Test(expected = RuntimeException.class)
+   public void testUnknowSpout() {
+   TopologyBuilder builder = new TopologyBuilder();
+   builder.setSpout("spout", new TestSpout());
+   builder.setBolt("bolt", new 
TestBolt()).shuffleGrouping("unknown");
+
+   FlinkTopology.createTopology(builder);
}
 
-   @Test(expected = UnsupportedOperationException.class)
-   public void testExecute() throws Exception {
-   new FlinkTopology().execute();
+   @Test(expected = RuntimeException.class)
+   public void testUnknowBolt() {
+   TopologyBuilder builder = new TopologyBuilder();
+   builder.setSpout("spout", new TestSpout());
+   builder.setBolt("bolt1", new 
TestBolt()).shuffleGrouping("spout");
+   builder.setBolt("bolt2", new 
TestBolt()).shuffleGrouping("unknown");
+
+   FlinkTopology.createTopology(builder);
}
 
-   @Test(expected = UnsupportedOperationException.class)
-   public void testExecuteWithName() throws Exception {
-   new FlinkTopology().execute(null);
+   @Test(expected = RuntimeException.class)
+   public void testUndeclaredStream() {
+   TopologyBuilder builder = new TopologyBuilder();
+   builder.setSpout("spout", new TestSpout());
+   builder.setBolt("bolt", new 
TestBolt()).shuffleGrouping("spout");
+
+   FlinkTopology.createTopology(builder);
}
 
@Test
-   public void testNumberOfTasks() {
-   final FlinkTopology topology = new FlinkTopology();
+   @Ignore
--- End diff --

ok


> FlinkTopologyBuilder cannot handle multiple input streams
> -
>
> Key: FLINK-2837
> URL: https://issues.apache.org/jira/browse/FLINK-2837
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Reporter: Matthias J. Sax
>Assignee: Maximilian Michels
>
> FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead 
> of union the incoming streams, it replicates the consuming bolt and each 
> (logical) instance processes one of the input streams.
> For example:
> {noformat}
> final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
> builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10));
> builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8));
> builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13));
> builder.setBolt(boltId, new MergerBolt())
>   .shuffleGrouping(spoutId1)
>   .shuffleGrouping(spoutId2)
>   .shuffleGrouping(spoutId3);
> builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter()))
>   .shuffleGrouping(boltId);
> {noformat}
> will only print the data from a single source instead of all sources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964610
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java
 ---
@@ -14,50 +14,70 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.storm.api;
 
-import org.apache.flink.storm.api.FlinkTopology;
-import org.junit.Assert;
+
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import org.apache.flink.storm.util.TestDummyBolt;
+import org.apache.flink.storm.util.TestDummySpout;
+import org.apache.flink.storm.util.TestSink;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class FlinkTopologyTest {
 
-   @Test
-   public void testDefaultParallelism() {
-   final FlinkTopology topology = new FlinkTopology();
-   Assert.assertEquals(1, topology.getParallelism());
+   @Test(expected = RuntimeException.class)
+   public void testUnknowSpout() {
+   TopologyBuilder builder = new TopologyBuilder();
+   builder.setSpout("spout", new TestSpout());
+   builder.setBolt("bolt", new 
TestBolt()).shuffleGrouping("unknown");
+
+   FlinkTopology.createTopology(builder);
}
 
-   @Test(expected = UnsupportedOperationException.class)
-   public void testExecute() throws Exception {
-   new FlinkTopology().execute();
+   @Test(expected = RuntimeException.class)
+   public void testUnknowBolt() {
+   TopologyBuilder builder = new TopologyBuilder();
+   builder.setSpout("spout", new TestSpout());
+   builder.setBolt("bolt1", new 
TestBolt()).shuffleGrouping("spout");
+   builder.setBolt("bolt2", new 
TestBolt()).shuffleGrouping("unknown");
+
+   FlinkTopology.createTopology(builder);
}
 
-   @Test(expected = UnsupportedOperationException.class)
-   public void testExecuteWithName() throws Exception {
-   new FlinkTopology().execute(null);
+   @Test(expected = RuntimeException.class)
+   public void testUndeclaredStream() {
+   TopologyBuilder builder = new TopologyBuilder();
+   builder.setSpout("spout", new TestSpout());
+   builder.setBolt("bolt", new 
TestBolt()).shuffleGrouping("spout");
+
+   FlinkTopology.createTopology(builder);
}
 
@Test
-   public void testNumberOfTasks() {
-   final FlinkTopology topology = new FlinkTopology();
+   @Ignore
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964692
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java
 ---
@@ -14,50 +14,70 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.storm.api;
 
-import org.apache.flink.storm.api.FlinkTopology;
-import org.junit.Assert;
+
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import org.apache.flink.storm.util.TestDummyBolt;
+import org.apache.flink.storm.util.TestDummySpout;
+import org.apache.flink.storm.util.TestSink;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class FlinkTopologyTest {
 
-   @Test
-   public void testDefaultParallelism() {
-   final FlinkTopology topology = new FlinkTopology();
-   Assert.assertEquals(1, topology.getParallelism());
+   @Test(expected = RuntimeException.class)
+   public void testUnknowSpout() {
+   TopologyBuilder builder = new TopologyBuilder();
+   builder.setSpout("spout", new TestSpout());
+   builder.setBolt("bolt", new 
TestBolt()).shuffleGrouping("unknown");
+
+   FlinkTopology.createTopology(builder);
}
 
-   @Test(expected = UnsupportedOperationException.class)
-   public void testExecute() throws Exception {
-   new FlinkTopology().execute();
+   @Test(expected = RuntimeException.class)
+   public void testUnknowBolt() {
+   TopologyBuilder builder = new TopologyBuilder();
+   builder.setSpout("spout", new TestSpout());
+   builder.setBolt("bolt1", new 
TestBolt()).shuffleGrouping("spout");
+   builder.setBolt("bolt2", new 
TestBolt()).shuffleGrouping("unknown");
+
+   FlinkTopology.createTopology(builder);
}
 
-   @Test(expected = UnsupportedOperationException.class)
-   public void testExecuteWithName() throws Exception {
-   new FlinkTopology().execute(null);
+   @Test(expected = RuntimeException.class)
+   public void testUndeclaredStream() {
+   TopologyBuilder builder = new TopologyBuilder();
+   builder.setSpout("spout", new TestSpout());
+   builder.setBolt("bolt", new 
TestBolt()).shuffleGrouping("spout");
+
+   FlinkTopology.createTopology(builder);
}
 
@Test
-   public void testNumberOfTasks() {
-   final FlinkTopology topology = new FlinkTopology();
+   @Ignore
+   public void testFieldsGroupingOnMultipleSpoutOutputStreams() {
+   TopologyBuilder builder = new TopologyBuilder();
 
-   Assert.assertEquals(0, topology.getNumberOfTasks());
+   builder.setSpout("spout", new TestDummySpout());
+   builder.setBolt("sink", new TestSink()).fieldsGrouping("spout",
+   TestDummySpout.spoutStreamId, new Fields("id"));
 
-   topology.increaseNumberOfTasks(3);
-   Assert.assertEquals(3, topology.getNumberOfTasks());
+   FlinkTopology.createTopology(builder);
+   }
 
-   topology.increaseNumberOfTasks(2);
-   Assert.assertEquals(5, topology.getNumberOfTasks());
+   @Test
+   @Ignore
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3055) ExecutionVertex has duplicate method getParallelSubtaskIndex and getSubTaskIndex

2015-11-26 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028522#comment-15028522
 ] 

Robert Metzger commented on FLINK-3055:
---

I don't know why I added a second getter for this. Most likely I just oversaw 
the getParallelSubtaskIndex. Since getParallelSubtaskIndex is used in so many 
places, I would probably keep this one.

> ExecutionVertex has duplicate method getParallelSubtaskIndex and 
> getSubTaskIndex
> 
>
> Key: FLINK-3055
> URL: https://issues.apache.org/jira/browse/FLINK-3055
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime
>Affects Versions: 0.10.0
>Reporter: Ufuk Celebi
>Assignee: jun aoki
>Priority: Trivial
>
> In {{ExecutionVertex}}:
> {code}
> public int getSubTaskIndex() {
>   return subTaskIndex;
> }
> public int getParallelSubtaskIndex() {
>   return this.subTaskIndex;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028525#comment-15028525
 ] 

ASF GitHub Bot commented on FLINK-2837:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964692
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java
 ---
@@ -14,50 +14,70 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.storm.api;
 
-import org.apache.flink.storm.api.FlinkTopology;
-import org.junit.Assert;
+
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import org.apache.flink.storm.util.TestDummyBolt;
+import org.apache.flink.storm.util.TestDummySpout;
+import org.apache.flink.storm.util.TestSink;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class FlinkTopologyTest {
 
-   @Test
-   public void testDefaultParallelism() {
-   final FlinkTopology topology = new FlinkTopology();
-   Assert.assertEquals(1, topology.getParallelism());
+   @Test(expected = RuntimeException.class)
+   public void testUnknowSpout() {
+   TopologyBuilder builder = new TopologyBuilder();
+   builder.setSpout("spout", new TestSpout());
+   builder.setBolt("bolt", new 
TestBolt()).shuffleGrouping("unknown");
+
+   FlinkTopology.createTopology(builder);
}
 
-   @Test(expected = UnsupportedOperationException.class)
-   public void testExecute() throws Exception {
-   new FlinkTopology().execute();
+   @Test(expected = RuntimeException.class)
+   public void testUnknowBolt() {
+   TopologyBuilder builder = new TopologyBuilder();
+   builder.setSpout("spout", new TestSpout());
+   builder.setBolt("bolt1", new 
TestBolt()).shuffleGrouping("spout");
+   builder.setBolt("bolt2", new 
TestBolt()).shuffleGrouping("unknown");
+
+   FlinkTopology.createTopology(builder);
}
 
-   @Test(expected = UnsupportedOperationException.class)
-   public void testExecuteWithName() throws Exception {
-   new FlinkTopology().execute(null);
+   @Test(expected = RuntimeException.class)
+   public void testUndeclaredStream() {
+   TopologyBuilder builder = new TopologyBuilder();
+   builder.setSpout("spout", new TestSpout());
+   builder.setBolt("bolt", new 
TestBolt()).shuffleGrouping("spout");
+
+   FlinkTopology.createTopology(builder);
}
 
@Test
-   public void testNumberOfTasks() {
-   final FlinkTopology topology = new FlinkTopology();
+   @Ignore
+   public void testFieldsGroupingOnMultipleSpoutOutputStreams() {
+   TopologyBuilder builder = new TopologyBuilder();
 
-   Assert.assertEquals(0, topology.getNumberOfTasks());
+   builder.setSpout("spout", new TestDummySpout());
+   builder.setBolt("sink", new TestSink()).fieldsGrouping("spout",
+   TestDummySpout.spoutStreamId, new Fields("id"));
 
-   topology.increaseNumberOfTasks(3);
-   Assert.assertEquals(3, topology.getNumberOfTasks());
+   FlinkTopology.createTopology(builder);
+   }
 
-   topology.increaseNumberOfTasks(2);
-   Assert.assertEquals(5, topology.getNumberOfTasks());
+   @Test
+   @Ignore
--- End diff --

ok


> FlinkTopologyBuilder cannot handle multiple input streams
> -
>
> Key: FLINK-2837
> URL: https://issues.apache.org/jira/browse/FLINK-2837
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Reporter: Matthias J. Sax
>Assignee: Maximilian Michels
>
> FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead 
> of union the incoming streams, it replicates the consuming bolt and each 
> (logical) instance processes one of the input streams.
> For example:
> {noformat}
> final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
> builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10));
> builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8));
> builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13));
> builder.setBolt(boltId, new MergerBolt())
>   .shuffleGrouping(spoutId1)
>   .shuffleGrouping(spoutId2)
>   .shuffleGrouping(spoutId3);
> builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter()))
>   .shuffleGrouping(boltId);
> 

[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964776
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java
 ---
@@ -16,14 +16,14 @@
  */
 package org.apache.flink.storm.api;
 
-import java.util.Map;
-
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichBolt;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Tuple;
 
+import java.util.Map;
+
 public class TestBolt implements IRichBolt {
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964866
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java
 ---
@@ -26,6 +24,8 @@
 import backtype.storm.tuple.Values;
 import backtype.storm.utils.Utils;
 
+import java.util.Map;
+
 public class TestDummySpout implements IRichSpout {
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964832
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java
 ---
@@ -16,13 +16,13 @@
  */
 package org.apache.flink.storm.api;
 
-import java.util.Map;
-
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.topology.OutputFieldsDeclarer;
 
+import java.util.Map;
+
 public class TestSpout implements IRichSpout {
private static final long serialVersionUID = -4884029383198924007L;
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028527#comment-15028527
 ] 

ASF GitHub Bot commented on FLINK-2837:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964776
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java
 ---
@@ -16,14 +16,14 @@
  */
 package org.apache.flink.storm.api;
 
-import java.util.Map;
-
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichBolt;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Tuple;
 
+import java.util.Map;
+
 public class TestBolt implements IRichBolt {
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


> FlinkTopologyBuilder cannot handle multiple input streams
> -
>
> Key: FLINK-2837
> URL: https://issues.apache.org/jira/browse/FLINK-2837
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Reporter: Matthias J. Sax
>Assignee: Maximilian Michels
>
> FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead 
> of union the incoming streams, it replicates the consuming bolt and each 
> (logical) instance processes one of the input streams.
> For example:
> {noformat}
> final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
> builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10));
> builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8));
> builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13));
> builder.setBolt(boltId, new MergerBolt())
>   .shuffleGrouping(spoutId1)
>   .shuffleGrouping(spoutId2)
>   .shuffleGrouping(spoutId3);
> builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter()))
>   .shuffleGrouping(boltId);
> {noformat}
> will only print the data from a single source instead of all sources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964853
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java
 ---
@@ -26,6 +24,8 @@
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 
+import java.util.Map;
+
 public class TestDummyBolt implements IRichBolt {
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964876
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java
 ---
@@ -16,16 +16,16 @@
  */
 package org.apache.flink.storm.util;
 
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichBolt;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Tuple;
 
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
 public class TestSink implements IRichBolt {
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028529#comment-15028529
 ] 

ASF GitHub Bot commented on FLINK-2837:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964853
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java
 ---
@@ -26,6 +24,8 @@
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 
+import java.util.Map;
+
 public class TestDummyBolt implements IRichBolt {
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


> FlinkTopologyBuilder cannot handle multiple input streams
> -
>
> Key: FLINK-2837
> URL: https://issues.apache.org/jira/browse/FLINK-2837
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Reporter: Matthias J. Sax
>Assignee: Maximilian Michels
>
> FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead 
> of union the incoming streams, it replicates the consuming bolt and each 
> (logical) instance processes one of the input streams.
> For example:
> {noformat}
> final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
> builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10));
> builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8));
> builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13));
> builder.setBolt(boltId, new MergerBolt())
>   .shuffleGrouping(spoutId1)
>   .shuffleGrouping(spoutId2)
>   .shuffleGrouping(spoutId3);
> builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter()))
>   .shuffleGrouping(boltId);
> {noformat}
> will only print the data from a single source instead of all sources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028530#comment-15028530
 ] 

ASF GitHub Bot commented on FLINK-2837:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964866
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java
 ---
@@ -26,6 +24,8 @@
 import backtype.storm.tuple.Values;
 import backtype.storm.utils.Utils;
 
+import java.util.Map;
+
 public class TestDummySpout implements IRichSpout {
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


> FlinkTopologyBuilder cannot handle multiple input streams
> -
>
> Key: FLINK-2837
> URL: https://issues.apache.org/jira/browse/FLINK-2837
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Reporter: Matthias J. Sax
>Assignee: Maximilian Michels
>
> FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead 
> of union the incoming streams, it replicates the consuming bolt and each 
> (logical) instance processes one of the input streams.
> For example:
> {noformat}
> final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
> builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10));
> builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8));
> builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13));
> builder.setBolt(boltId, new MergerBolt())
>   .shuffleGrouping(spoutId1)
>   .shuffleGrouping(spoutId2)
>   .shuffleGrouping(spoutId3);
> builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter()))
>   .shuffleGrouping(boltId);
> {noformat}
> will only print the data from a single source instead of all sources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028532#comment-15028532
 ] 

ASF GitHub Bot commented on FLINK-2837:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45964904
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
 ---
@@ -21,7 +21,6 @@
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.tuple.Fields;
-
 import org.apache.flink.api.common.ExecutionConfig;
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


> FlinkTopologyBuilder cannot handle multiple input streams
> -
>
> Key: FLINK-2837
> URL: https://issues.apache.org/jira/browse/FLINK-2837
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Reporter: Matthias J. Sax
>Assignee: Maximilian Michels
>
> FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead 
> of union the incoming streams, it replicates the consuming bolt and each 
> (logical) instance processes one of the input streams.
> For example:
> {noformat}
> final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
> builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10));
> builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8));
> builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13));
> builder.setBolt(boltId, new MergerBolt())
>   .shuffleGrouping(spoutId1)
>   .shuffleGrouping(spoutId2)
>   .shuffleGrouping(spoutId3);
> builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter()))
>   .shuffleGrouping(boltId);
> {noformat}
> will only print the data from a single source instead of all sources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45965316
  
--- Diff: 
flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java
 ---
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.print;
+
+import backtype.storm.Config;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.utils.Utils;
+import org.apache.flink.storm.api.FlinkLocalCluster;
+import org.apache.flink.storm.api.FlinkTopology;
+import storm.starter.bolt.PrinterBolt;
+import storm.starter.spout.TwitterSampleSpout;
+
+import java.util.Arrays;
+
+/**
+ * Prints incoming tweets. Tweets can be filtered by keywords.
+ */
+public class PrintSampleStream {
+   public static void main(String[] args) throws Exception {
--- End diff --

I cannot see why it did not work before? Can you explain what the problem 
was?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028535#comment-15028535
 ] 

ASF GitHub Bot commented on FLINK-2837:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45965316
  
--- Diff: 
flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java
 ---
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.print;
+
+import backtype.storm.Config;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.utils.Utils;
+import org.apache.flink.storm.api.FlinkLocalCluster;
+import org.apache.flink.storm.api.FlinkTopology;
+import storm.starter.bolt.PrinterBolt;
+import storm.starter.spout.TwitterSampleSpout;
+
+import java.util.Arrays;
+
+/**
+ * Prints incoming tweets. Tweets can be filtered by keywords.
+ */
+public class PrintSampleStream {
+   public static void main(String[] args) throws Exception {
--- End diff --

I cannot see why it did not work before? Can you explain what the problem 
was?


> FlinkTopologyBuilder cannot handle multiple input streams
> -
>
> Key: FLINK-2837
> URL: https://issues.apache.org/jira/browse/FLINK-2837
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Reporter: Matthias J. Sax
>Assignee: Maximilian Michels
>
> FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead 
> of union the incoming streams, it replicates the consuming bolt and each 
> (logical) instance processes one of the input streams.
> For example:
> {noformat}
> final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
> builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10));
> builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8));
> builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13));
> builder.setBolt(boltId, new MergerBolt())
>   .shuffleGrouping(spoutId1)
>   .shuffleGrouping(spoutId2)
>   .shuffleGrouping(spoutId3);
> builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter()))
>   .shuffleGrouping(boltId);
> {noformat}
> will only print the data from a single source instead of all sources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3051) Define a maximum number of concurrent inflight checkpoints

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028538#comment-15028538
 ] 

ASF GitHub Bot commented on FLINK-3051:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1408#issuecomment-159876995
  
Will do this in a separate pull request as follow up.

Any concerns about merging this?


> Define a maximum number of concurrent inflight checkpoints
> --
>
> Key: FLINK-3051
> URL: https://issues.apache.org/jira/browse/FLINK-3051
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.0.0
>
>
> The checkpoint coordinator should define an option to limit the maximum 
> number of current inflight checkpoints, as well as the checkpoint timeouts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45965523
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
 ---
@@ -27,13 +27,12 @@
 import backtype.storm.state.ISubscribedState;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Fields;
+import clojure.lang.Atom;
 
--- End diff --

I see. DataArtians committer can do any change, but external committers get 
bullied if they apply similar changes... It is not against you or the change 
itself -- it unifies the style which does make sense. But I got bullied 
multiple times in other PRs when I did similar stuff...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028537#comment-15028537
 ] 

ASF GitHub Bot commented on FLINK-2837:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45965523
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
 ---
@@ -27,13 +27,12 @@
 import backtype.storm.state.ISubscribedState;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Fields;
+import clojure.lang.Atom;
 
--- End diff --

I see. DataArtians committer can do any change, but external committers get 
bullied if they apply similar changes... It is not against you or the change 
itself -- it unifies the style which does make sense. But I got bullied 
multiple times in other PRs when I did similar stuff...


> FlinkTopologyBuilder cannot handle multiple input streams
> -
>
> Key: FLINK-2837
> URL: https://issues.apache.org/jira/browse/FLINK-2837
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Reporter: Matthias J. Sax
>Assignee: Maximilian Michels
>
> FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead 
> of union the incoming streams, it replicates the consuming bolt and each 
> (logical) instance processes one of the input streams.
> For example:
> {noformat}
> final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
> builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10));
> builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8));
> builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13));
> builder.setBolt(boltId, new MergerBolt())
>   .shuffleGrouping(spoutId1)
>   .shuffleGrouping(spoutId2)
>   .shuffleGrouping(spoutId3);
> builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter()))
>   .shuffleGrouping(boltId);
> {noformat}
> will only print the data from a single source instead of all sources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45966621
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
 ---
@@ -20,11 +20,9 @@
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
-
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028551#comment-15028551
 ] 

ASF GitHub Bot commented on FLINK-2837:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45966621
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
 ---
@@ -20,11 +20,9 @@
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
-
--- End diff --

It follows the import style of the other classes, so I'll leave this as it 
is.


> FlinkTopologyBuilder cannot handle multiple input streams
> -
>
> Key: FLINK-2837
> URL: https://issues.apache.org/jira/browse/FLINK-2837
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Reporter: Matthias J. Sax
>Assignee: Maximilian Michels
>
> FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead 
> of union the incoming streams, it replicates the consuming bolt and each 
> (logical) instance processes one of the input streams.
> For example:
> {noformat}
> final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
> builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10));
> builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8));
> builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13));
> builder.setBolt(boltId, new MergerBolt())
>   .shuffleGrouping(spoutId1)
>   .shuffleGrouping(spoutId2)
>   .shuffleGrouping(spoutId3);
> builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter()))
>   .shuffleGrouping(boltId);
> {noformat}
> will only print the data from a single source instead of all sources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45967336
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
 ---
@@ -27,13 +27,12 @@
 import backtype.storm.state.ISubscribedState;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Fields;
+import clojure.lang.Atom;
 
--- End diff --

Not sure who is bullying whom :) Look at the classes and you will see that 
all imports are arranged like this. We want to be consistent, right? According 
to your suggestion, I changed the other import statements which were just 
reformatting.

Open source is often about compromises. Very rarely you will find that the 
code style of a person reflects exactly how you would do it. I'm making 
compromises and changing things as you like them. That's fine for me. Please 
don't give me a harder time by blaming my employer. I'm not aware I have done 
something like this to you. Next time you get blamed for something like this, 
please contact me and I'll try to help you. I don't think this is the right 
place to sort out things.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3081) Kafka Periodic Offset Committer does not properly terminate on canceling

2015-11-26 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-3081:
---

 Summary: Kafka Periodic Offset Committer does not properly 
terminate on canceling
 Key: FLINK-3081
 URL: https://issues.apache.org/jira/browse/FLINK-3081
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 0.10.1
Reporter: Stephan Ewen
Priority: Blocker
 Fix For: 1.0.0, 0.10.2


The committer is only stopped at the end of the run method. Any termination of 
the run method via an exception keeps the periodic committer thread running.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2954) Not able to pass custom environment variables in cluster to processes that spawning TaskManager

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028613#comment-15028613
 ] 

ASF GitHub Bot commented on FLINK-2954:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1409#discussion_r45970338
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -243,6 +243,20 @@
 */
public static final String YARN_PROPERTIES_FILE_LOCATION = 
"yarn.properties-file.location";
 
+   /**
+* Prefix for passing custom environment variables to Flink's 
ApplicationMaster (JobManager).
+* For example for passing LD_LIBRARY_PATH as an env variable to the 
AppMaster, set:
+*  yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"
+* in the flink-conf.yaml.
+*/
+   public static final String YARN_APPLICATION_MASTER_ENV_PREFIX = 
"yarn.application-master.env.";
+
+   /**
+* Similar to the {@see YARN_APPLICATION_MASTER_ENV_PREFIX}, this 
configuration prefix allows
+* setting custom environment variables.
+*/
+   public static final String YARN_TASK_MANAGER_ENV_PREFIX = 
"yarn.taskmanager.env.";
--- End diff --

I wonder about the naming here. Maybe this should be 
`YARN_RESOURCE_MANAGER_ENV_PREFIX`? Or change 
`YARN_APPLICATION_MASTER_ENV_PREFIX` to `YARN_JOB_MANAGER_ENV_PREFIX`?


> Not able to pass custom environment variables in cluster to processes that 
> spawning TaskManager
> ---
>
> Key: FLINK-2954
> URL: https://issues.apache.org/jira/browse/FLINK-2954
> Project: Flink
>  Issue Type: Bug
>  Components: Command-line client, Distributed Runtime
>Affects Versions: 0.10.0
>Reporter: Jian Jiang
>Assignee: Robert Metzger
>Priority: Critical
> Fix For: 1.0.0
>
>
> There are programs that rely on custom environment variables. In hadoop 
> mapreduce job we can use -Dmapreduce.map.env and - Dmapreduce.reduce.env to 
> do pass them. Similarly in Spark
> we can use --conf 'spark.executor.XXX=value for XXX'. There is no such 
> feature yet in Flink.
> This has given Flink a serious disadvantage when customers need such feature.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2954] Add config parameter for passing ...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1409#discussion_r45970372
  
--- Diff: 
flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java ---
@@ -97,6 +98,18 @@ public void tooMuchCutoff() {
Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
}
 
+   @Test
+   public void testGetEnvironmentVariables() {
+   Configuration testConf = new Configuration();
+   
testConf.setString("yarn.application-master.env.LD_LIBRARY_PATH", 
"/usr/lib/native");
+
+   Map res = 
Utils.getEnvironmentVariables("yarn.application-master.env.", testConf);
+
+   Assert.assertEquals(1, res.size());
+   Map.Entry entry = 
res.entrySet().iterator().next();
+   Assert.assertEquals("LD_LIBRARY_PATH", entry.getKey());
+   Assert.assertEquals("/usr/lib/native", entry.getValue());
+   }
--- End diff --

There is no test for the task manager variables.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2954) Not able to pass custom environment variables in cluster to processes that spawning TaskManager

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028615#comment-15028615
 ] 

ASF GitHub Bot commented on FLINK-2954:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1409#discussion_r45970372
  
--- Diff: 
flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java ---
@@ -97,6 +98,18 @@ public void tooMuchCutoff() {
Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
}
 
+   @Test
+   public void testGetEnvironmentVariables() {
+   Configuration testConf = new Configuration();
+   
testConf.setString("yarn.application-master.env.LD_LIBRARY_PATH", 
"/usr/lib/native");
+
+   Map res = 
Utils.getEnvironmentVariables("yarn.application-master.env.", testConf);
+
+   Assert.assertEquals(1, res.size());
+   Map.Entry entry = 
res.entrySet().iterator().next();
+   Assert.assertEquals("LD_LIBRARY_PATH", entry.getKey());
+   Assert.assertEquals("/usr/lib/native", entry.getValue());
+   }
--- End diff --

There is no test for the task manager variables.


> Not able to pass custom environment variables in cluster to processes that 
> spawning TaskManager
> ---
>
> Key: FLINK-2954
> URL: https://issues.apache.org/jira/browse/FLINK-2954
> Project: Flink
>  Issue Type: Bug
>  Components: Command-line client, Distributed Runtime
>Affects Versions: 0.10.0
>Reporter: Jian Jiang
>Assignee: Robert Metzger
>Priority: Critical
> Fix For: 1.0.0
>
>
> There are programs that rely on custom environment variables. In hadoop 
> mapreduce job we can use -Dmapreduce.map.env and - Dmapreduce.reduce.env to 
> do pass them. Similarly in Spark
> we can use --conf 'spark.executor.XXX=value for XXX'. There is no such 
> feature yet in Flink.
> This has given Flink a serious disadvantage when customers need such feature.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45970389
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
 ---
@@ -27,13 +27,12 @@
 import backtype.storm.state.ISubscribedState;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Fields;
+import clojure.lang.Atom;
 
--- End diff --

Good we are on the same page. And I don't want to bully you! I just 
mentioned the classes that do not contain any actual code change -- actually, 
according to the coding guidelines -- there should be no import-order changes 
even in the classes with code changes -- I did not comment on them -- just on 
the classes with pure reformatting. I like consistency so please apply the 
changes to all classes. But I did import-reorderings or making code formatting 
consistent (if it was inconsistent) and was always told "don't do this". So if 
it is a general rule, I just point it out here, too. I did not come up with the 
rule. And I never force my own code style -- a always adapt to the given style. 
:) It's is really about time to get a proper maven formatting tool running to 
get rid of all this stupid discussions. (And a said already: "It is not against 
you or the change itself" -- but the process seems to be inconsistent -- people 
follow the rules more or less strictly)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028617#comment-15028617
 ] 

ASF GitHub Bot commented on FLINK-2837:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45970389
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
 ---
@@ -27,13 +27,12 @@
 import backtype.storm.state.ISubscribedState;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Fields;
+import clojure.lang.Atom;
 
--- End diff --

Good we are on the same page. And I don't want to bully you! I just 
mentioned the classes that do not contain any actual code change -- actually, 
according to the coding guidelines -- there should be no import-order changes 
even in the classes with code changes -- I did not comment on them -- just on 
the classes with pure reformatting. I like consistency so please apply the 
changes to all classes. But I did import-reorderings or making code formatting 
consistent (if it was inconsistent) and was always told "don't do this". So if 
it is a general rule, I just point it out here, too. I did not come up with the 
rule. And I never force my own code style -- a always adapt to the given style. 
:) It's is really about time to get a proper maven formatting tool running to 
get rid of all this stupid discussions. (And a said already: "It is not against 
you or the change itself" -- but the process seems to be inconsistent -- people 
follow the rules more or less strictly)


> FlinkTopologyBuilder cannot handle multiple input streams
> -
>
> Key: FLINK-2837
> URL: https://issues.apache.org/jira/browse/FLINK-2837
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Reporter: Matthias J. Sax
>Assignee: Maximilian Michels
>
> FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead 
> of union the incoming streams, it replicates the consuming bolt and each 
> (logical) instance processes one of the input streams.
> For example:
> {noformat}
> final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
> builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10));
> builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8));
> builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13));
> builder.setBolt(boltId, new MergerBolt())
>   .shuffleGrouping(spoutId1)
>   .shuffleGrouping(spoutId2)
>   .shuffleGrouping(spoutId3);
> builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter()))
>   .shuffleGrouping(boltId);
> {noformat}
> will only print the data from a single source instead of all sources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2954] Add config parameter for passing ...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1409#discussion_r45970591
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -221,4 +210,22 @@ public static void addToEnvironment(Map environment,
private Utils() {
throw new RuntimeException();
}
+
+   /**
+* Method to extract environment variables from the flinkConfiguration 
based on the given prefix String.
+*
+* @param envPrefix Prefix for the environment variables key
+* @param flinkConfiguration The Flink config to get the environment 
variable defintion from
+*/
+   public static Map getEnvironmentVariables(String 
envPrefix, org.apache.flink.configuration.Configuration flinkConfiguration) {
+   Map result  = new HashMap<>();
+   for(Map.Entry entry: 
flinkConfiguration.toMap().entrySet()) {
+   if(entry.getKey().startsWith(envPrefix)) {
+   // remove prefix
+   String key = 
entry.getKey().substring(envPrefix.length());
--- End diff --

What happens if the key is `envPrefix`? There is little validation here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2954) Not able to pass custom environment variables in cluster to processes that spawning TaskManager

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028625#comment-15028625
 ] 

ASF GitHub Bot commented on FLINK-2954:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1409#discussion_r45970591
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -221,4 +210,22 @@ public static void addToEnvironment(Map environment,
private Utils() {
throw new RuntimeException();
}
+
+   /**
+* Method to extract environment variables from the flinkConfiguration 
based on the given prefix String.
+*
+* @param envPrefix Prefix for the environment variables key
+* @param flinkConfiguration The Flink config to get the environment 
variable defintion from
+*/
+   public static Map getEnvironmentVariables(String 
envPrefix, org.apache.flink.configuration.Configuration flinkConfiguration) {
+   Map result  = new HashMap<>();
+   for(Map.Entry entry: 
flinkConfiguration.toMap().entrySet()) {
+   if(entry.getKey().startsWith(envPrefix)) {
+   // remove prefix
+   String key = 
entry.getKey().substring(envPrefix.length());
--- End diff --

What happens if the key is `envPrefix`? There is little validation here.


> Not able to pass custom environment variables in cluster to processes that 
> spawning TaskManager
> ---
>
> Key: FLINK-2954
> URL: https://issues.apache.org/jira/browse/FLINK-2954
> Project: Flink
>  Issue Type: Bug
>  Components: Command-line client, Distributed Runtime
>Affects Versions: 0.10.0
>Reporter: Jian Jiang
>Assignee: Robert Metzger
>Priority: Critical
> Fix For: 1.0.0
>
>
> There are programs that rely on custom environment variables. In hadoop 
> mapreduce job we can use -Dmapreduce.map.env and - Dmapreduce.reduce.env to 
> do pass them. Similarly in Spark
> we can use --conf 'spark.executor.XXX=value for XXX'. There is no such 
> feature yet in Flink.
> This has given Flink a serious disadvantage when customers need such feature.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2954] Add config parameter for passing ...

2015-11-26 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1409#issuecomment-159893248
  
Looks good except for some minor issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2954) Not able to pass custom environment variables in cluster to processes that spawning TaskManager

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028627#comment-15028627
 ] 

ASF GitHub Bot commented on FLINK-2954:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1409#issuecomment-159893248
  
Looks good except for some minor issues.


> Not able to pass custom environment variables in cluster to processes that 
> spawning TaskManager
> ---
>
> Key: FLINK-2954
> URL: https://issues.apache.org/jira/browse/FLINK-2954
> Project: Flink
>  Issue Type: Bug
>  Components: Command-line client, Distributed Runtime
>Affects Versions: 0.10.0
>Reporter: Jian Jiang
>Assignee: Robert Metzger
>Priority: Critical
> Fix For: 1.0.0
>
>
> There are programs that rely on custom environment variables. In hadoop 
> mapreduce job we can use -Dmapreduce.map.env and - Dmapreduce.reduce.env to 
> do pass them. Similarly in Spark
> we can use --conf 'spark.executor.XXX=value for XXX'. There is no such 
> feature yet in Flink.
> This has given Flink a serious disadvantage when customers need such feature.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028628#comment-15028628
 ] 

ASF GitHub Bot commented on FLINK-2837:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45970656
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java
 ---
@@ -44,16 +45,30 @@
/** The schema (ie, ordered field names) of the tuple */
private final Fields schema;
 
+   private final int taskId;
+   private final String producerStreamId;
+   private final MessageId id;
+   private final String producerComponentId;
+
+
+   /**
+* Constructor which sets defaults for producerComponentId, taskId, and 
componentID
+* @param flinkTuple the Flink tuple
+* @param schema The schema of the storm fields
+*/
+   StormTuple(final IN flinkTuple, final Fields schema) {
+   this(flinkTuple, schema, -1, "testStream", "componentID");
+   }
--- End diff --

Ok. I guess it would make sense to use Storm's `Utils.DEFAULT_STREAM_ID` 
here? And maybe add a `public final static String DEFAULT_OPERATOR_ID` variable 
to `StormTuple`? What about using "defaultID" or "unspecified" instead of 
"componentID" or similar? Just to make it clear if the name shows up in the UI?


> FlinkTopologyBuilder cannot handle multiple input streams
> -
>
> Key: FLINK-2837
> URL: https://issues.apache.org/jira/browse/FLINK-2837
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Reporter: Matthias J. Sax
>Assignee: Maximilian Michels
>
> FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead 
> of union the incoming streams, it replicates the consuming bolt and each 
> (logical) instance processes one of the input streams.
> For example:
> {noformat}
> final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
> builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10));
> builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8));
> builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13));
> builder.setBolt(boltId, new MergerBolt())
>   .shuffleGrouping(spoutId1)
>   .shuffleGrouping(spoutId2)
>   .shuffleGrouping(spoutId3);
> builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter()))
>   .shuffleGrouping(boltId);
> {noformat}
> will only print the data from a single source instead of all sources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3046] Integrate the Either Java type wi...

2015-11-26 Thread twalthr
Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1393#issuecomment-159893451
  
Thanks for testing my PR. Sorry for the bug. I forgot to test the most 
obvious case. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3046) Integrate the Either Java type with the TypeExtractor

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028630#comment-15028630
 ] 

ASF GitHub Bot commented on FLINK-3046:
---

Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1393#issuecomment-159893451
  
Thanks for testing my PR. Sorry for the bug. I forgot to test the most 
obvious case. 


> Integrate the Either Java type with the TypeExtractor
> -
>
> Key: FLINK-3046
> URL: https://issues.apache.org/jira/browse/FLINK-3046
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Affects Versions: 1.0.0
>Reporter: Vasia Kalavri
>Assignee: Timo Walther
>
> Integrate the Either Java type with the TypeExtractor, so that the APIs 
> recognize the type and choose the type info properly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45971476
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java
 ---
@@ -44,16 +45,30 @@
/** The schema (ie, ordered field names) of the tuple */
private final Fields schema;
 
+   private final int taskId;
+   private final String producerStreamId;
+   private final MessageId id;
+   private final String producerComponentId;
+
+
+   /**
+* Constructor which sets defaults for producerComponentId, taskId, and 
componentID
+* @param flinkTuple the Flink tuple
+* @param schema The schema of the storm fields
+*/
+   StormTuple(final IN flinkTuple, final Fields schema) {
+   this(flinkTuple, schema, -1, "testStream", "componentID");
+   }
--- End diff --

Fair enough, I use default id constants now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028638#comment-15028638
 ] 

ASF GitHub Bot commented on FLINK-2837:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45971476
  
--- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java
 ---
@@ -44,16 +45,30 @@
/** The schema (ie, ordered field names) of the tuple */
private final Fields schema;
 
+   private final int taskId;
+   private final String producerStreamId;
+   private final MessageId id;
+   private final String producerComponentId;
+
+
+   /**
+* Constructor which sets defaults for producerComponentId, taskId, and 
componentID
+* @param flinkTuple the Flink tuple
+* @param schema The schema of the storm fields
+*/
+   StormTuple(final IN flinkTuple, final Fields schema) {
+   this(flinkTuple, schema, -1, "testStream", "componentID");
+   }
--- End diff --

Fair enough, I use default id constants now.


> FlinkTopologyBuilder cannot handle multiple input streams
> -
>
> Key: FLINK-2837
> URL: https://issues.apache.org/jira/browse/FLINK-2837
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Reporter: Matthias J. Sax
>Assignee: Maximilian Michels
>
> FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead 
> of union the incoming streams, it replicates the consuming bolt and each 
> (logical) instance processes one of the input streams.
> For example:
> {noformat}
> final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
> builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10));
> builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8));
> builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13));
> builder.setBolt(boltId, new MergerBolt())
>   .shuffleGrouping(spoutId1)
>   .shuffleGrouping(spoutId2)
>   .shuffleGrouping(spoutId3);
> builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter()))
>   .shuffleGrouping(boltId);
> {noformat}
> will only print the data from a single source instead of all sources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...

2015-11-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45971856
  
--- Diff: 
flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java
 ---
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.print;
+
+import backtype.storm.Config;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.utils.Utils;
+import org.apache.flink.storm.api.FlinkLocalCluster;
+import org.apache.flink.storm.api.FlinkTopology;
+import storm.starter.bolt.PrinterBolt;
+import storm.starter.spout.TwitterSampleSpout;
+
+import java.util.Arrays;
+
+/**
+ * Prints incoming tweets. Tweets can be filtered by keywords.
+ */
+public class PrintSampleStream {
+   public static void main(String[] args) throws Exception {
--- End diff --

The problem was that the `BoltWrapper` wouldn't create a `BoltCollector` if 
the bolt didn't define any output fields. That led to a NullPointerException.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028646#comment-15028646
 ] 

ASF GitHub Bot commented on FLINK-2837:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1398#discussion_r45971856
  
--- Diff: 
flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java
 ---
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.print;
+
+import backtype.storm.Config;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.utils.Utils;
+import org.apache.flink.storm.api.FlinkLocalCluster;
+import org.apache.flink.storm.api.FlinkTopology;
+import storm.starter.bolt.PrinterBolt;
+import storm.starter.spout.TwitterSampleSpout;
+
+import java.util.Arrays;
+
+/**
+ * Prints incoming tweets. Tweets can be filtered by keywords.
+ */
+public class PrintSampleStream {
+   public static void main(String[] args) throws Exception {
--- End diff --

The problem was that the `BoltWrapper` wouldn't create a `BoltCollector` if 
the bolt didn't define any output fields. That led to a NullPointerException.


> FlinkTopologyBuilder cannot handle multiple input streams
> -
>
> Key: FLINK-2837
> URL: https://issues.apache.org/jira/browse/FLINK-2837
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Reporter: Matthias J. Sax
>Assignee: Maximilian Michels
>
> FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead 
> of union the incoming streams, it replicates the consuming bolt and each 
> (logical) instance processes one of the input streams.
> For example:
> {noformat}
> final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
> builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10));
> builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8));
> builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13));
> builder.setBolt(boltId, new MergerBolt())
>   .shuffleGrouping(spoutId1)
>   .shuffleGrouping(spoutId2)
>   .shuffleGrouping(spoutId3);
> builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter()))
>   .shuffleGrouping(boltId);
> {noformat}
> will only print the data from a single source instead of all sources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2954) Not able to pass custom environment variables in cluster to processes that spawning TaskManager

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028665#comment-15028665
 ] 

ASF GitHub Bot commented on FLINK-2954:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1409#discussion_r45973129
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -243,6 +243,20 @@
 */
public static final String YARN_PROPERTIES_FILE_LOCATION = 
"yarn.properties-file.location";
 
+   /**
+* Prefix for passing custom environment variables to Flink's 
ApplicationMaster (JobManager).
+* For example for passing LD_LIBRARY_PATH as an env variable to the 
AppMaster, set:
+*  yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"
+* in the flink-conf.yaml.
+*/
+   public static final String YARN_APPLICATION_MASTER_ENV_PREFIX = 
"yarn.application-master.env.";
+
+   /**
+* Similar to the {@see YARN_APPLICATION_MASTER_ENV_PREFIX}, this 
configuration prefix allows
+* setting custom environment variables.
+*/
+   public static final String YARN_TASK_MANAGER_ENV_PREFIX = 
"yarn.taskmanager.env.";
--- End diff --

I think `YARN_RESOURCE_MANAGER_ENV_PREFIX` is just wrong bc its a service 
of YARN, not of Flink 
(http://hortonworks.com/blog/apache-hadoop-yarn-resourcemanager/).

Whether to use AM or JM is a good question. In the YARN context, AM and JM 
are basically the same. I think AM is a bit more consistent.


> Not able to pass custom environment variables in cluster to processes that 
> spawning TaskManager
> ---
>
> Key: FLINK-2954
> URL: https://issues.apache.org/jira/browse/FLINK-2954
> Project: Flink
>  Issue Type: Bug
>  Components: Command-line client, Distributed Runtime
>Affects Versions: 0.10.0
>Reporter: Jian Jiang
>Assignee: Robert Metzger
>Priority: Critical
> Fix For: 1.0.0
>
>
> There are programs that rely on custom environment variables. In hadoop 
> mapreduce job we can use -Dmapreduce.map.env and - Dmapreduce.reduce.env to 
> do pass them. Similarly in Spark
> we can use --conf 'spark.executor.XXX=value for XXX'. There is no such 
> feature yet in Flink.
> This has given Flink a serious disadvantage when customers need such feature.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2954] Add config parameter for passing ...

2015-11-26 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1409#discussion_r45973129
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -243,6 +243,20 @@
 */
public static final String YARN_PROPERTIES_FILE_LOCATION = 
"yarn.properties-file.location";
 
+   /**
+* Prefix for passing custom environment variables to Flink's 
ApplicationMaster (JobManager).
+* For example for passing LD_LIBRARY_PATH as an env variable to the 
AppMaster, set:
+*  yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"
+* in the flink-conf.yaml.
+*/
+   public static final String YARN_APPLICATION_MASTER_ENV_PREFIX = 
"yarn.application-master.env.";
+
+   /**
+* Similar to the {@see YARN_APPLICATION_MASTER_ENV_PREFIX}, this 
configuration prefix allows
+* setting custom environment variables.
+*/
+   public static final String YARN_TASK_MANAGER_ENV_PREFIX = 
"yarn.taskmanager.env.";
--- End diff --

I think `YARN_RESOURCE_MANAGER_ENV_PREFIX` is just wrong bc its a service 
of YARN, not of Flink 
(http://hortonworks.com/blog/apache-hadoop-yarn-resourcemanager/).

Whether to use AM or JM is a good question. In the YARN context, AM and JM 
are basically the same. I think AM is a bit more consistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2954) Not able to pass custom environment variables in cluster to processes that spawning TaskManager

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028668#comment-15028668
 ] 

ASF GitHub Bot commented on FLINK-2954:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1409#discussion_r45973217
  
--- Diff: 
flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java ---
@@ -97,6 +98,18 @@ public void tooMuchCutoff() {
Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
}
 
+   @Test
+   public void testGetEnvironmentVariables() {
+   Configuration testConf = new Configuration();
+   
testConf.setString("yarn.application-master.env.LD_LIBRARY_PATH", 
"/usr/lib/native");
+
+   Map res = 
Utils.getEnvironmentVariables("yarn.application-master.env.", testConf);
+
+   Assert.assertEquals(1, res.size());
+   Map.Entry entry = 
res.entrySet().iterator().next();
+   Assert.assertEquals("LD_LIBRARY_PATH", entry.getKey());
+   Assert.assertEquals("/usr/lib/native", entry.getValue());
+   }
--- End diff --

I think its okay to assume that the method will work for all prefix strings.
This test is only about the method for extracting the variables.


> Not able to pass custom environment variables in cluster to processes that 
> spawning TaskManager
> ---
>
> Key: FLINK-2954
> URL: https://issues.apache.org/jira/browse/FLINK-2954
> Project: Flink
>  Issue Type: Bug
>  Components: Command-line client, Distributed Runtime
>Affects Versions: 0.10.0
>Reporter: Jian Jiang
>Assignee: Robert Metzger
>Priority: Critical
> Fix For: 1.0.0
>
>
> There are programs that rely on custom environment variables. In hadoop 
> mapreduce job we can use -Dmapreduce.map.env and - Dmapreduce.reduce.env to 
> do pass them. Similarly in Spark
> we can use --conf 'spark.executor.XXX=value for XXX'. There is no such 
> feature yet in Flink.
> This has given Flink a serious disadvantage when customers need such feature.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2954] Add config parameter for passing ...

2015-11-26 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1409#discussion_r45973217
  
--- Diff: 
flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java ---
@@ -97,6 +98,18 @@ public void tooMuchCutoff() {
Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
}
 
+   @Test
+   public void testGetEnvironmentVariables() {
+   Configuration testConf = new Configuration();
+   
testConf.setString("yarn.application-master.env.LD_LIBRARY_PATH", 
"/usr/lib/native");
+
+   Map res = 
Utils.getEnvironmentVariables("yarn.application-master.env.", testConf);
+
+   Assert.assertEquals(1, res.size());
+   Map.Entry entry = 
res.entrySet().iterator().next();
+   Assert.assertEquals("LD_LIBRARY_PATH", entry.getKey());
+   Assert.assertEquals("/usr/lib/native", entry.getValue());
+   }
--- End diff --

I think its okay to assume that the method will work for all prefix strings.
This test is only about the method for extracting the variables.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2954] Add config parameter for passing ...

2015-11-26 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1409#discussion_r45973237
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -221,4 +210,22 @@ public static void addToEnvironment(Map environment,
private Utils() {
throw new RuntimeException();
}
+
+   /**
+* Method to extract environment variables from the flinkConfiguration 
based on the given prefix String.
+*
+* @param envPrefix Prefix for the environment variables key
+* @param flinkConfiguration The Flink config to get the environment 
variable defintion from
+*/
+   public static Map getEnvironmentVariables(String 
envPrefix, org.apache.flink.configuration.Configuration flinkConfiguration) {
+   Map result  = new HashMap<>();
+   for(Map.Entry entry: 
flinkConfiguration.toMap().entrySet()) {
+   if(entry.getKey().startsWith(envPrefix)) {
+   // remove prefix
+   String key = 
entry.getKey().substring(envPrefix.length());
--- End diff --

I agree. I'll harden the method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2954) Not able to pass custom environment variables in cluster to processes that spawning TaskManager

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028670#comment-15028670
 ] 

ASF GitHub Bot commented on FLINK-2954:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1409#discussion_r45973237
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -221,4 +210,22 @@ public static void addToEnvironment(Map environment,
private Utils() {
throw new RuntimeException();
}
+
+   /**
+* Method to extract environment variables from the flinkConfiguration 
based on the given prefix String.
+*
+* @param envPrefix Prefix for the environment variables key
+* @param flinkConfiguration The Flink config to get the environment 
variable defintion from
+*/
+   public static Map getEnvironmentVariables(String 
envPrefix, org.apache.flink.configuration.Configuration flinkConfiguration) {
+   Map result  = new HashMap<>();
+   for(Map.Entry entry: 
flinkConfiguration.toMap().entrySet()) {
+   if(entry.getKey().startsWith(envPrefix)) {
+   // remove prefix
+   String key = 
entry.getKey().substring(envPrefix.length());
--- End diff --

I agree. I'll harden the method.


> Not able to pass custom environment variables in cluster to processes that 
> spawning TaskManager
> ---
>
> Key: FLINK-2954
> URL: https://issues.apache.org/jira/browse/FLINK-2954
> Project: Flink
>  Issue Type: Bug
>  Components: Command-line client, Distributed Runtime
>Affects Versions: 0.10.0
>Reporter: Jian Jiang
>Assignee: Robert Metzger
>Priority: Critical
> Fix For: 1.0.0
>
>
> There are programs that rely on custom environment variables. In hadoop 
> mapreduce job we can use -Dmapreduce.map.env and - Dmapreduce.reduce.env to 
> do pass them. Similarly in Spark
> we can use --conf 'spark.executor.XXX=value for XXX'. There is no such 
> feature yet in Flink.
> This has given Flink a serious disadvantage when customers need such feature.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3081) Kafka Periodic Offset Committer does not properly terminate on canceling

2015-11-26 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger reassigned FLINK-3081:
-

Assignee: Robert Metzger

> Kafka Periodic Offset Committer does not properly terminate on canceling
> 
>
> Key: FLINK-3081
> URL: https://issues.apache.org/jira/browse/FLINK-3081
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 0.10.1
>Reporter: Stephan Ewen
>Assignee: Robert Metzger
>Priority: Blocker
> Fix For: 1.0.0, 0.10.2
>
>
> The committer is only stopped at the end of the run method. Any termination 
> of the run method via an exception keeps the periodic committer thread 
> running.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2954] Add config parameter for passing ...

2015-11-26 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1409#issuecomment-159907627
  
Thank you for the good review. I addressed some of your concerns.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2954) Not able to pass custom environment variables in cluster to processes that spawning TaskManager

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028715#comment-15028715
 ] 

ASF GitHub Bot commented on FLINK-2954:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1409#issuecomment-159907627
  
Thank you for the good review. I addressed some of your concerns.


> Not able to pass custom environment variables in cluster to processes that 
> spawning TaskManager
> ---
>
> Key: FLINK-2954
> URL: https://issues.apache.org/jira/browse/FLINK-2954
> Project: Flink
>  Issue Type: Bug
>  Components: Command-line client, Distributed Runtime
>Affects Versions: 0.10.0
>Reporter: Jian Jiang
>Assignee: Robert Metzger
>Priority: Critical
> Fix For: 1.0.0
>
>
> There are programs that rely on custom environment variables. In hadoop 
> mapreduce job we can use -Dmapreduce.map.env and - Dmapreduce.reduce.env to 
> do pass them. Similarly in Spark
> we can use --conf 'spark.executor.XXX=value for XXX'. There is no such 
> feature yet in Flink.
> This has given Flink a serious disadvantage when customers need such feature.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3051] Control the maximum number of con...

2015-11-26 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1408#issuecomment-159910663
  
Please give me an hour or so to look at this :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3051) Define a maximum number of concurrent inflight checkpoints

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028762#comment-15028762
 ] 

ASF GitHub Bot commented on FLINK-3051:
---

Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1408#issuecomment-159910663
  
Please give me an hour or so to look at this :)


> Define a maximum number of concurrent inflight checkpoints
> --
>
> Key: FLINK-3051
> URL: https://issues.apache.org/jira/browse/FLINK-3051
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.0.0
>
>
> The checkpoint coordinator should define an option to limit the maximum 
> number of current inflight checkpoints, as well as the checkpoint timeouts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3081) Kafka Periodic Offset Committer does not properly terminate on canceling

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028781#comment-15028781
 ] 

ASF GitHub Bot commented on FLINK-3081:
---

GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/1410

[FLINK-3081] Properly stop periodic Kafka committer



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink flink3081

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1410.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1410


commit 70373b381fbcb5afc3a56f8610766cfe31eb963f
Author: Robert Metzger 
Date:   2015-11-26T13:25:54Z

[FLINK-3081] Properly stop periodic Kafka committer




> Kafka Periodic Offset Committer does not properly terminate on canceling
> 
>
> Key: FLINK-3081
> URL: https://issues.apache.org/jira/browse/FLINK-3081
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 0.10.1
>Reporter: Stephan Ewen
>Assignee: Robert Metzger
>Priority: Blocker
> Fix For: 1.0.0, 0.10.2
>
>
> The committer is only stopped at the end of the run method. Any termination 
> of the run method via an exception keeps the periodic committer thread 
> running.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3081] Properly stop periodic Kafka comm...

2015-11-26 Thread rmetzger
GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/1410

[FLINK-3081] Properly stop periodic Kafka committer



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink flink3081

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1410.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1410


commit 70373b381fbcb5afc3a56f8610766cfe31eb963f
Author: Robert Metzger 
Date:   2015-11-26T13:25:54Z

[FLINK-3081] Properly stop periodic Kafka committer




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3051] Control the maximum number of con...

2015-11-26 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1408#issuecomment-159915515
  
Looks good and the minimum delay between checkpoints would be an extremely 
useful feature. What's missing for that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3051) Define a maximum number of concurrent inflight checkpoints

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028796#comment-15028796
 ] 

ASF GitHub Bot commented on FLINK-3051:
---

Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1408#issuecomment-159915515
  
Looks good and the minimum delay between checkpoints would be an extremely 
useful feature. What's missing for that?


> Define a maximum number of concurrent inflight checkpoints
> --
>
> Key: FLINK-3051
> URL: https://issues.apache.org/jira/browse/FLINK-3051
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.0.0
>
>
> The checkpoint coordinator should define an option to limit the maximum 
> number of current inflight checkpoints, as well as the checkpoint timeouts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3046] Integrate the Either Java type wi...

2015-11-26 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1393#issuecomment-159916143
  
Seems to work now :) :+1: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3046) Integrate the Either Java type with the TypeExtractor

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028799#comment-15028799
 ] 

ASF GitHub Bot commented on FLINK-3046:
---

Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1393#issuecomment-159916143
  
Seems to work now :) :+1: 


> Integrate the Either Java type with the TypeExtractor
> -
>
> Key: FLINK-3046
> URL: https://issues.apache.org/jira/browse/FLINK-3046
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Affects Versions: 1.0.0
>Reporter: Vasia Kalavri
>Assignee: Timo Walther
>
> Integrate the Either Java type with the TypeExtractor, so that the APIs 
> recognize the type and choose the type info properly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3082) Confusing error about ManualTimestampSourceFunction

2015-11-26 Thread Niels Basjes (JIRA)
Niels Basjes created FLINK-3082:
---

 Summary: Confusing error about ManualTimestampSourceFunction
 Key: FLINK-3082
 URL: https://issues.apache.org/jira/browse/FLINK-3082
 Project: Flink
  Issue Type: Bug
Reporter: Niels Basjes


I wrote a source like this:
{code}
public class Foo extends RichSourceFunction {
{code}
and then did
{code}
ctx.collectWithTimestamp(event, event.eventTimestamp);
{code}

I got this error:
{code}
Caused by: java.lang.UnsupportedOperationException: Automatic-Timestamp sources 
cannot emit elements with a timestamp. See interface 
ManualTimestampSourceFunction if you want to manually assign timestamps to 
elements.
at 
org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collectWithTimestamp(StreamSource.java:97)
{code}

After some digging it turns out that {{ManualTimestampSourceFunction}} was 
renamed to {{EventTimeSourceFunction}} and apparently the old name still 
lingers in this error message.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3082) Confusing error about ManualTimestampSourceFunction

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028861#comment-15028861
 ] 

ASF GitHub Bot commented on FLINK-3082:
---

GitHub user nielsbasjes opened a pull request:

https://github.com/apache/flink/pull/1411

[FLINK-3082] Fixed confusing error about an interface that no longer exists

The ManualTimestampSourceFunction interface does not exist. 
Yet there are error messages that thell you to take a look at it.
This simply fixes these error messages.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nielsbasjes/flink FLINK-3082

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1411.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1411


commit 80d69afb855214bb41a1f257049c550dc4ca3859
Author: Niels Basjes 
Date:   2015-11-26T14:21:04Z

[FLINK-3082] Fixed confusing error about an interface that no longer exists




> Confusing error about ManualTimestampSourceFunction
> ---
>
> Key: FLINK-3082
> URL: https://issues.apache.org/jira/browse/FLINK-3082
> Project: Flink
>  Issue Type: Bug
>Reporter: Niels Basjes
>
> I wrote a source like this:
> {code}
> public class Foo extends RichSourceFunction {
> {code}
> and then did
> {code}
> ctx.collectWithTimestamp(event, event.eventTimestamp);
> {code}
> I got this error:
> {code}
> Caused by: java.lang.UnsupportedOperationException: Automatic-Timestamp 
> sources cannot emit elements with a timestamp. See interface 
> ManualTimestampSourceFunction if you want to manually assign timestamps to 
> elements.
>   at 
> org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collectWithTimestamp(StreamSource.java:97)
> {code}
> After some digging it turns out that {{ManualTimestampSourceFunction}} was 
> renamed to {{EventTimeSourceFunction}} and apparently the old name still 
> lingers in this error message.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3082] Fixed confusing error about an in...

2015-11-26 Thread nielsbasjes
GitHub user nielsbasjes opened a pull request:

https://github.com/apache/flink/pull/1411

[FLINK-3082] Fixed confusing error about an interface that no longer exists

The ManualTimestampSourceFunction interface does not exist. 
Yet there are error messages that thell you to take a look at it.
This simply fixes these error messages.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nielsbasjes/flink FLINK-3082

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1411.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1411


commit 80d69afb855214bb41a1f257049c550dc4ca3859
Author: Niels Basjes 
Date:   2015-11-26T14:21:04Z

[FLINK-3082] Fixed confusing error about an interface that no longer exists




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3051] Control the maximum number of con...

2015-11-26 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1408#issuecomment-159932475
  
Missing for that is a bit of code in the checkpoint coordinator that marks 
the time when checkpoints become possible again and adds that delay to the time 
when the next checkpoint is scheduled.

I'd suggest to add that as a followup...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3051) Define a maximum number of concurrent inflight checkpoints

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028931#comment-15028931
 ] 

ASF GitHub Bot commented on FLINK-3051:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1408#issuecomment-159932475
  
Missing for that is a bit of code in the checkpoint coordinator that marks 
the time when checkpoints become possible again and adds that delay to the time 
when the next checkpoint is scheduled.

I'd suggest to add that as a followup...


> Define a maximum number of concurrent inflight checkpoints
> --
>
> Key: FLINK-3051
> URL: https://issues.apache.org/jira/browse/FLINK-3051
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.0.0
>
>
> The checkpoint coordinator should define an option to limit the maximum 
> number of current inflight checkpoints, as well as the checkpoint timeouts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3081] Properly stop periodic Kafka comm...

2015-11-26 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1410#discussion_r45985979
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
 ---
@@ -415,13 +415,17 @@ public void run(SourceContext sourceContext) 
throws Exception {
long commitInterval = 
Long.valueOf(props.getProperty("auto.commit.interval.ms", "6"));
offsetCommitter = new 
PeriodicOffsetCommitter(commitInterval, this);
offsetCommitter.start();
+   offsetCommitter.setDaemon(true);
--- End diff --

I think that has to be before `start()` (otherwise it fails)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3081] Properly stop periodic Kafka comm...

2015-11-26 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1410#issuecomment-159933351
  
One small comment, otherwise looks good...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3081) Kafka Periodic Offset Committer does not properly terminate on canceling

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028947#comment-15028947
 ] 

ASF GitHub Bot commented on FLINK-3081:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1410#discussion_r45985979
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
 ---
@@ -415,13 +415,17 @@ public void run(SourceContext sourceContext) 
throws Exception {
long commitInterval = 
Long.valueOf(props.getProperty("auto.commit.interval.ms", "6"));
offsetCommitter = new 
PeriodicOffsetCommitter(commitInterval, this);
offsetCommitter.start();
+   offsetCommitter.setDaemon(true);
--- End diff --

I think that has to be before `start()` (otherwise it fails)


> Kafka Periodic Offset Committer does not properly terminate on canceling
> 
>
> Key: FLINK-3081
> URL: https://issues.apache.org/jira/browse/FLINK-3081
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 0.10.1
>Reporter: Stephan Ewen
>Assignee: Robert Metzger
>Priority: Blocker
> Fix For: 1.0.0, 0.10.2
>
>
> The committer is only stopped at the end of the run method. Any termination 
> of the run method via an exception keeps the periodic committer thread 
> running.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2522] Adds streaming support for Flink-...

2015-11-26 Thread nikste
GitHub user nikste opened a pull request:

https://github.com/apache/flink/pull/1412

[FLINK-2522] Adds streaming support for Flink-Scala-Shell

Adds streaming collect support for scala

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nikste/flink Flink-2522_Scala_shell_streaming

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1412.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1412


commit b736c2a6fb5cdc487f6389f58b2b6cf969b96991
Author: Nikolaas Steenbergen 
Date:   2015-08-12T08:42:59Z

Adds streaming support for Flink-Scala-Shell
Adds streaming collect support for scala




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3083) Add docs how to configure streaming fault tolerance

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15029059#comment-15029059
 ] 

ASF GitHub Bot commented on FLINK-3083:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/1413

[FLINK-3083] [docs] Add docs on how to configure streaming fault toleance

This documents the features of #1408 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink ft_docs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1413.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1413






> Add docs how to configure streaming fault tolerance
> ---
>
> Key: FLINK-3083
> URL: https://issues.apache.org/jira/browse/FLINK-3083
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3083] [docs] Add docs on how to configu...

2015-11-26 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/1413

[FLINK-3083] [docs] Add docs on how to configure streaming fault toleance

This documents the features of #1408 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink ft_docs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1413.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1413






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3084) File State Backend should not write very small state into files

2015-11-26 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-3084:
---

 Summary: File State Backend should not write very small state into 
files
 Key: FLINK-3084
 URL: https://issues.apache.org/jira/browse/FLINK-3084
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10.0
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.0.0


Currently, the {{FsStateBackend}} writes all state into files. Some state (like 
Kafka Offsets) is so small that it adds unnecessary overhead, and sometimes the 
checkpointed file handles are larger than the actual state.

Small state (below a certain threshold, say 1 KB) should not be stored in 
files, but directly in the state handles.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3085) Move State Backend Initialization from "registerInputOutput()" to "invoke()"

2015-11-26 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-3085:
---

 Summary: Move State Backend Initialization from 
"registerInputOutput()" to "invoke()"
 Key: FLINK-3085
 URL: https://issues.apache.org/jira/browse/FLINK-3085
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10.0
Reporter: Stephan Ewen
 Fix For: 1.0.0


The state backend initialization currently happens in the {{StreamTask}} in 
{{registerInputOutput()}}. For better error handling, it should be part of 
{{invoke()}}, where the task is properly interrupted, threads are properly 
joined, and exceptions are treated aware of cancelling.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3082) Confusing error about ManualTimestampSourceFunction

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15029089#comment-15029089
 ] 

ASF GitHub Bot commented on FLINK-3082:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1411#issuecomment-159956326
  
Thanks for catching, this.

Will merge this...


> Confusing error about ManualTimestampSourceFunction
> ---
>
> Key: FLINK-3082
> URL: https://issues.apache.org/jira/browse/FLINK-3082
> Project: Flink
>  Issue Type: Bug
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>
> I wrote a source like this:
> {code}
> public class Foo extends RichSourceFunction {
> {code}
> and then did
> {code}
> ctx.collectWithTimestamp(event, event.eventTimestamp);
> {code}
> I got this error:
> {code}
> Caused by: java.lang.UnsupportedOperationException: Automatic-Timestamp 
> sources cannot emit elements with a timestamp. See interface 
> ManualTimestampSourceFunction if you want to manually assign timestamps to 
> elements.
>   at 
> org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collectWithTimestamp(StreamSource.java:97)
> {code}
> After some digging it turns out that {{ManualTimestampSourceFunction}} was 
> renamed to {{EventTimeSourceFunction}} and apparently the old name still 
> lingers in this error message.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3051) Define a maximum number of concurrent inflight checkpoints

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15029136#comment-15029136
 ] 

ASF GitHub Bot commented on FLINK-3051:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1408


> Define a maximum number of concurrent inflight checkpoints
> --
>
> Key: FLINK-3051
> URL: https://issues.apache.org/jira/browse/FLINK-3051
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.0.0
>
>
> The checkpoint coordinator should define an option to limit the maximum 
> number of current inflight checkpoints, as well as the checkpoint timeouts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3051] Control the maximum number of con...

2015-11-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1408


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3086) ExpressionParser does not support concatenation of suffix operations

2015-11-26 Thread Timo Walther (JIRA)
Timo Walther created FLINK-3086:
---

 Summary: ExpressionParser does not support concatenation of suffix 
operations
 Key: FLINK-3086
 URL: https://issues.apache.org/jira/browse/FLINK-3086
 Project: Flink
  Issue Type: Bug
  Components: Table API
Reporter: Timo Walther


The ExpressionParser of the Table API does not support concatenation of suffix 
operations. e.g. {code}table.select("field.cast(STRING).substring(2)"){code} 
throws  an exception.

{code}
org.apache.flink.api.table.ExpressionException: Could not parse expression: 
string matching regex `\z' expected but `.' found
at 
org.apache.flink.api.table.parser.ExpressionParser$.parseExpressionList(ExpressionParser.scala:224)
{code}

However, the Scala implicit Table Expression API supports this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3046] Integrate the Either Java type wi...

2015-11-26 Thread twalthr
Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1393#issuecomment-159983550
  
Will merge this tomorrow if no objections are raised.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3046) Integrate the Either Java type with the TypeExtractor

2015-11-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15029211#comment-15029211
 ] 

ASF GitHub Bot commented on FLINK-3046:
---

Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1393#issuecomment-159983550
  
Will merge this tomorrow if no objections are raised.


> Integrate the Either Java type with the TypeExtractor
> -
>
> Key: FLINK-3046
> URL: https://issues.apache.org/jira/browse/FLINK-3046
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Affects Versions: 1.0.0
>Reporter: Vasia Kalavri
>Assignee: Timo Walther
>
> Integrate the Either Java type with the TypeExtractor, so that the APIs 
> recognize the type and choose the type info properly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >