[jira] [Commented] (FLINK-3051) Define a maximum number of concurrent inflight checkpoints
[ https://issues.apache.org/jira/browse/FLINK-3051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028376#comment-15028376 ] ASF GitHub Bot commented on FLINK-3051: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1408#issuecomment-159851509 Good addition! More control over our checkpointing is something people were asking me at talks. Could you add a few sentences to the documentation about this? > Define a maximum number of concurrent inflight checkpoints > -- > > Key: FLINK-3051 > URL: https://issues.apache.org/jira/browse/FLINK-3051 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.0.0 > > > The checkpoint coordinator should define an option to limit the maximum > number of current inflight checkpoints, as well as the checkpoint timeouts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2488] Expose Attempt Number in RuntimeC...
Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/1386#issuecomment-159853758 I'll wait for Stephan to review this then. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3051] Control the maximum number of con...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1408#issuecomment-159866582 Where is the docs would be the best place for that? - A new entry in the "Programing Guides" menu? - Or a section in the streaming guide? I would vote for the first --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3051) Define a maximum number of concurrent inflight checkpoints
[ https://issues.apache.org/jira/browse/FLINK-3051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028464#comment-15028464 ] ASF GitHub Bot commented on FLINK-3051: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1408#issuecomment-159866582 Where is the docs would be the best place for that? - A new entry in the "Programing Guides" menu? - Or a section in the streaming guide? I would vote for the first > Define a maximum number of concurrent inflight checkpoints > -- > > Key: FLINK-3051 > URL: https://issues.apache.org/jira/browse/FLINK-3051 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.0.0 > > > The checkpoint coordinator should define an option to limit the maximum > number of current inflight checkpoints, as well as the checkpoint timeouts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2960) Communicate with JobManager through YARN proxy
[ https://issues.apache.org/jira/browse/FLINK-2960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028477#comment-15028477 ] Robert Metzger commented on FLINK-2960: --- I'm sorry, it won't happen again. > Communicate with JobManager through YARN proxy > -- > > Key: FLINK-2960 > URL: https://issues.apache.org/jira/browse/FLINK-2960 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 0.10.0 >Reporter: Maximilian Michels > > In a secured environment, ports on the ApplicationMaster may be blocked. > Thus, runtime messages have to pass through YARN interfaces and then be > forwarded to the ApplicationMaster/JobManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3051] Control the maximum number of con...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1408#issuecomment-159868163 I think it fits best into the "Fault Tolerance" section of the Streaming Guide: https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#fault-tolerance If you think that section will grow to big, we could move it to a new page and link it from the streaming guide. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3046) Integrate the Either Java type with the TypeExtractor
[ https://issues.apache.org/jira/browse/FLINK-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028486#comment-15028486 ] ASF GitHub Bot commented on FLINK-3046: --- Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1393#issuecomment-159869432 Maybe I am not aware of the limitations but things like this dont seem to work: TypeExtractor.getForObject(Either.left("")); source.map(new MapFunction >() { @Override public Either map(Long value) throws Exception { return null; } }); > Integrate the Either Java type with the TypeExtractor > - > > Key: FLINK-3046 > URL: https://issues.apache.org/jira/browse/FLINK-3046 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Affects Versions: 1.0.0 >Reporter: Vasia Kalavri >Assignee: Timo Walther > > Integrate the Either Java type with the TypeExtractor, so that the APIs > recognize the type and choose the type info properly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3046] Integrate the Either Java type wi...
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1393#issuecomment-159869432 Maybe I am not aware of the limitations but things like this dont seem to work: TypeExtractor.getForObject(Either.left("")); source.map(new MapFunction >() { @Override public Either map(Long value) throws Exception { return null; } }); --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3046) Integrate the Either Java type with the TypeExtractor
[ https://issues.apache.org/jira/browse/FLINK-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028504#comment-15028504 ] ASF GitHub Bot commented on FLINK-3046: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1393#issuecomment-159871232 The second variant should work (in the MapFunction signature) - if not, that is a bug. The first variant cannot work because the type for "right" is nowhere in a non-erased form. As a replacement for TypeExtractor.getForObject() I suggested that: https://issues.apache.org/jira/browse/FLINK-2788 > Integrate the Either Java type with the TypeExtractor > - > > Key: FLINK-3046 > URL: https://issues.apache.org/jira/browse/FLINK-3046 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Affects Versions: 1.0.0 >Reporter: Vasia Kalavri >Assignee: Timo Walther > > Integrate the Either Java type with the TypeExtractor, so that the APIs > recognize the type and choose the type info properly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45963943 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java --- @@ -33,7 +30,8 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import com.google.common.collect.Sets; +import java.util.Collection; +import java.util.HashMap; --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3046] Integrate the Either Java type wi...
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1393#issuecomment-159871753 it gives the error : Usage of class Either as a type is not allowed. Use a concrete subclass instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45963933 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java --- @@ -44,16 +45,30 @@ /** The schema (ie, ordered field names) of the tuple */ private final Fields schema; + private final int taskId; + private final String producerStreamId; + private final MessageId id; + private final String producerComponentId; + + + /** +* Constructor which sets defaults for producerComponentId, taskId, and componentID +* @param flinkTuple the Flink tuple +* @param schema The schema of the storm fields +*/ + StormTuple(final IN flinkTuple, final Fields schema) { + this(flinkTuple, schema, -1, "testStream", "componentID"); + } --- End diff -- The use of null is often problematic. I prefer default values. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams
[ https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028505#comment-15028505 ] ASF GitHub Bot commented on FLINK-2837: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45963921 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java --- @@ -27,13 +27,12 @@ import backtype.storm.state.ISubscribedState; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Fields; +import clojure.lang.Atom; --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. > FlinkTopologyBuilder cannot handle multiple input streams > - > > Key: FLINK-2837 > URL: https://issues.apache.org/jira/browse/FLINK-2837 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Maximilian Michels > > FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead > of union the incoming streams, it replicates the consuming bolt and each > (logical) instance processes one of the input streams. > For example: > {noformat} > final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); > builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10)); > builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8)); > builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13)); > builder.setBolt(boltId, new MergerBolt()) > .shuffleGrouping(spoutId1) > .shuffleGrouping(spoutId2) > .shuffleGrouping(spoutId3); > builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter())) > .shuffleGrouping(boltId); > {noformat} > will only print the data from a single source instead of all sources. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams
[ https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028508#comment-15028508 ] ASF GitHub Bot commented on FLINK-2837: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45963943 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java --- @@ -33,7 +30,8 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import com.google.common.collect.Sets; +import java.util.Collection; +import java.util.HashMap; --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. > FlinkTopologyBuilder cannot handle multiple input streams > - > > Key: FLINK-2837 > URL: https://issues.apache.org/jira/browse/FLINK-2837 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Maximilian Michels > > FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead > of union the incoming streams, it replicates the consuming bolt and each > (logical) instance processes one of the input streams. > For example: > {noformat} > final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); > builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10)); > builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8)); > builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13)); > builder.setBolt(boltId, new MergerBolt()) > .shuffleGrouping(spoutId1) > .shuffleGrouping(spoutId2) > .shuffleGrouping(spoutId3); > builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter())) > .shuffleGrouping(boltId); > {noformat} > will only print the data from a single source instead of all sources. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3055) ExecutionVertex has duplicate method getParallelSubtaskIndex and getSubTaskIndex
[ https://issues.apache.org/jira/browse/FLINK-3055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028509#comment-15028509 ] Chesnay Schepler commented on FLINK-3055: - if i saw it correctly [~rmetzger] added the getSubTaskIndex() method, at a time when getParallelSubtaskIndex() already existed. Maybe he can chime in why he added the new method? (although it's may be quite a bit in the past) > ExecutionVertex has duplicate method getParallelSubtaskIndex and > getSubTaskIndex > > > Key: FLINK-3055 > URL: https://issues.apache.org/jira/browse/FLINK-3055 > Project: Flink > Issue Type: Improvement > Components: Distributed Runtime >Affects Versions: 0.10.0 >Reporter: Ufuk Celebi >Assignee: jun aoki >Priority: Trivial > > In {{ExecutionVertex}}: > {code} > public int getSubTaskIndex() { > return subTaskIndex; > } > public int getParallelSubtaskIndex() { > return this.subTaskIndex; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964023 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java --- @@ -17,12 +17,12 @@ package org.apache.flink.storm.wrappers; -import java.util.HashMap; - --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3046) Integrate the Either Java type with the TypeExtractor
[ https://issues.apache.org/jira/browse/FLINK-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028511#comment-15028511 ] ASF GitHub Bot commented on FLINK-3046: --- Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1393#issuecomment-159871753 it gives the error : Usage of class Either as a type is not allowed. Use a concrete subclass instead. > Integrate the Either Java type with the TypeExtractor > - > > Key: FLINK-3046 > URL: https://issues.apache.org/jira/browse/FLINK-3046 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Affects Versions: 1.0.0 >Reporter: Vasia Kalavri >Assignee: Timo Walther > > Integrate the Either Java type with the TypeExtractor, so that the APIs > recognize the type and choose the type info properly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams
[ https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028507#comment-15028507 ] ASF GitHub Bot commented on FLINK-2837: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45963933 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java --- @@ -44,16 +45,30 @@ /** The schema (ie, ordered field names) of the tuple */ private final Fields schema; + private final int taskId; + private final String producerStreamId; + private final MessageId id; + private final String producerComponentId; + + + /** +* Constructor which sets defaults for producerComponentId, taskId, and componentID +* @param flinkTuple the Flink tuple +* @param schema The schema of the storm fields +*/ + StormTuple(final IN flinkTuple, final Fields schema) { + this(flinkTuple, schema, -1, "testStream", "componentID"); + } --- End diff -- The use of null is often problematic. I prefer default values. > FlinkTopologyBuilder cannot handle multiple input streams > - > > Key: FLINK-2837 > URL: https://issues.apache.org/jira/browse/FLINK-2837 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Maximilian Michels > > FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead > of union the incoming streams, it replicates the consuming bolt and each > (logical) instance processes one of the input streams. > For example: > {noformat} > final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); > builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10)); > builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8)); > builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13)); > builder.setBolt(boltId, new MergerBolt()) > .shuffleGrouping(spoutId1) > .shuffleGrouping(spoutId2) > .shuffleGrouping(spoutId3); > builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter())) > .shuffleGrouping(boltId); > {noformat} > will only print the data from a single source instead of all sources. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3051] Control the maximum number of con...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1408#issuecomment-159872110 Should we pull that into a separate document? It becomes quite large... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams
[ https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028513#comment-15028513 ] ASF GitHub Bot commented on FLINK-2837: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964023 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java --- @@ -17,12 +17,12 @@ package org.apache.flink.storm.wrappers; -import java.util.HashMap; - --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. > FlinkTopologyBuilder cannot handle multiple input streams > - > > Key: FLINK-2837 > URL: https://issues.apache.org/jira/browse/FLINK-2837 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Maximilian Michels > > FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead > of union the incoming streams, it replicates the consuming bolt and each > (logical) instance processes one of the input streams. > For example: > {noformat} > final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); > builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10)); > builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8)); > builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13)); > builder.setBolt(boltId, new MergerBolt()) > .shuffleGrouping(spoutId1) > .shuffleGrouping(spoutId2) > .shuffleGrouping(spoutId3); > builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter())) > .shuffleGrouping(boltId); > {noformat} > will only print the data from a single source instead of all sources. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3051) Define a maximum number of concurrent inflight checkpoints
[ https://issues.apache.org/jira/browse/FLINK-3051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028514#comment-15028514 ] ASF GitHub Bot commented on FLINK-3051: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1408#issuecomment-159872110 Should we pull that into a separate document? It becomes quite large... > Define a maximum number of concurrent inflight checkpoints > -- > > Key: FLINK-3051 > URL: https://issues.apache.org/jira/browse/FLINK-3051 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.0.0 > > > The checkpoint coordinator should define an option to limit the maximum > number of current inflight checkpoints, as well as the checkpoint timeouts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964241 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java --- @@ -18,9 +18,7 @@ import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; - import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.storm.api.FlinkOutputFieldsDeclarer; --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3051] Control the maximum number of con...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1408#issuecomment-159872462 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams
[ https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028516#comment-15028516 ] ASF GitHub Bot commented on FLINK-2837: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964241 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java --- @@ -18,9 +18,7 @@ import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; - import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.storm.api.FlinkOutputFieldsDeclarer; --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. > FlinkTopologyBuilder cannot handle multiple input streams > - > > Key: FLINK-2837 > URL: https://issues.apache.org/jira/browse/FLINK-2837 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Maximilian Michels > > FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead > of union the incoming streams, it replicates the consuming bolt and each > (logical) instance processes one of the input streams. > For example: > {noformat} > final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); > builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10)); > builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8)); > builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13)); > builder.setBolt(boltId, new MergerBolt()) > .shuffleGrouping(spoutId1) > .shuffleGrouping(spoutId2) > .shuffleGrouping(spoutId3); > builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter())) > .shuffleGrouping(boltId); > {noformat} > will only print the data from a single source instead of all sources. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3051) Define a maximum number of concurrent inflight checkpoints
[ https://issues.apache.org/jira/browse/FLINK-3051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028518#comment-15028518 ] ASF GitHub Bot commented on FLINK-3051: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1408#issuecomment-159872462 +1 > Define a maximum number of concurrent inflight checkpoints > -- > > Key: FLINK-3051 > URL: https://issues.apache.org/jira/browse/FLINK-3051 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.0.0 > > > The checkpoint coordinator should define an option to limit the maximum > number of current inflight checkpoints, as well as the checkpoint timeouts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams
[ https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028523#comment-15028523 ] ASF GitHub Bot commented on FLINK-2837: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964610 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java --- @@ -14,50 +14,70 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.storm.api; -import org.apache.flink.storm.api.FlinkTopology; -import org.junit.Assert; + +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import org.apache.flink.storm.util.TestDummyBolt; +import org.apache.flink.storm.util.TestDummySpout; +import org.apache.flink.storm.util.TestSink; +import org.junit.Ignore; import org.junit.Test; public class FlinkTopologyTest { - @Test - public void testDefaultParallelism() { - final FlinkTopology topology = new FlinkTopology(); - Assert.assertEquals(1, topology.getParallelism()); + @Test(expected = RuntimeException.class) + public void testUnknowSpout() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt", new TestBolt()).shuffleGrouping("unknown"); + + FlinkTopology.createTopology(builder); } - @Test(expected = UnsupportedOperationException.class) - public void testExecute() throws Exception { - new FlinkTopology().execute(); + @Test(expected = RuntimeException.class) + public void testUnknowBolt() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt1", new TestBolt()).shuffleGrouping("spout"); + builder.setBolt("bolt2", new TestBolt()).shuffleGrouping("unknown"); + + FlinkTopology.createTopology(builder); } - @Test(expected = UnsupportedOperationException.class) - public void testExecuteWithName() throws Exception { - new FlinkTopology().execute(null); + @Test(expected = RuntimeException.class) + public void testUndeclaredStream() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt", new TestBolt()).shuffleGrouping("spout"); + + FlinkTopology.createTopology(builder); } @Test - public void testNumberOfTasks() { - final FlinkTopology topology = new FlinkTopology(); + @Ignore --- End diff -- ok > FlinkTopologyBuilder cannot handle multiple input streams > - > > Key: FLINK-2837 > URL: https://issues.apache.org/jira/browse/FLINK-2837 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Maximilian Michels > > FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead > of union the incoming streams, it replicates the consuming bolt and each > (logical) instance processes one of the input streams. > For example: > {noformat} > final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); > builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10)); > builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8)); > builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13)); > builder.setBolt(boltId, new MergerBolt()) > .shuffleGrouping(spoutId1) > .shuffleGrouping(spoutId2) > .shuffleGrouping(spoutId3); > builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter())) > .shuffleGrouping(boltId); > {noformat} > will only print the data from a single source instead of all sources. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964610 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java --- @@ -14,50 +14,70 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.storm.api; -import org.apache.flink.storm.api.FlinkTopology; -import org.junit.Assert; + +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import org.apache.flink.storm.util.TestDummyBolt; +import org.apache.flink.storm.util.TestDummySpout; +import org.apache.flink.storm.util.TestSink; +import org.junit.Ignore; import org.junit.Test; public class FlinkTopologyTest { - @Test - public void testDefaultParallelism() { - final FlinkTopology topology = new FlinkTopology(); - Assert.assertEquals(1, topology.getParallelism()); + @Test(expected = RuntimeException.class) + public void testUnknowSpout() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt", new TestBolt()).shuffleGrouping("unknown"); + + FlinkTopology.createTopology(builder); } - @Test(expected = UnsupportedOperationException.class) - public void testExecute() throws Exception { - new FlinkTopology().execute(); + @Test(expected = RuntimeException.class) + public void testUnknowBolt() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt1", new TestBolt()).shuffleGrouping("spout"); + builder.setBolt("bolt2", new TestBolt()).shuffleGrouping("unknown"); + + FlinkTopology.createTopology(builder); } - @Test(expected = UnsupportedOperationException.class) - public void testExecuteWithName() throws Exception { - new FlinkTopology().execute(null); + @Test(expected = RuntimeException.class) + public void testUndeclaredStream() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt", new TestBolt()).shuffleGrouping("spout"); + + FlinkTopology.createTopology(builder); } @Test - public void testNumberOfTasks() { - final FlinkTopology topology = new FlinkTopology(); + @Ignore --- End diff -- ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964692 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java --- @@ -14,50 +14,70 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.storm.api; -import org.apache.flink.storm.api.FlinkTopology; -import org.junit.Assert; + +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import org.apache.flink.storm.util.TestDummyBolt; +import org.apache.flink.storm.util.TestDummySpout; +import org.apache.flink.storm.util.TestSink; +import org.junit.Ignore; import org.junit.Test; public class FlinkTopologyTest { - @Test - public void testDefaultParallelism() { - final FlinkTopology topology = new FlinkTopology(); - Assert.assertEquals(1, topology.getParallelism()); + @Test(expected = RuntimeException.class) + public void testUnknowSpout() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt", new TestBolt()).shuffleGrouping("unknown"); + + FlinkTopology.createTopology(builder); } - @Test(expected = UnsupportedOperationException.class) - public void testExecute() throws Exception { - new FlinkTopology().execute(); + @Test(expected = RuntimeException.class) + public void testUnknowBolt() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt1", new TestBolt()).shuffleGrouping("spout"); + builder.setBolt("bolt2", new TestBolt()).shuffleGrouping("unknown"); + + FlinkTopology.createTopology(builder); } - @Test(expected = UnsupportedOperationException.class) - public void testExecuteWithName() throws Exception { - new FlinkTopology().execute(null); + @Test(expected = RuntimeException.class) + public void testUndeclaredStream() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt", new TestBolt()).shuffleGrouping("spout"); + + FlinkTopology.createTopology(builder); } @Test - public void testNumberOfTasks() { - final FlinkTopology topology = new FlinkTopology(); + @Ignore + public void testFieldsGroupingOnMultipleSpoutOutputStreams() { + TopologyBuilder builder = new TopologyBuilder(); - Assert.assertEquals(0, topology.getNumberOfTasks()); + builder.setSpout("spout", new TestDummySpout()); + builder.setBolt("sink", new TestSink()).fieldsGrouping("spout", + TestDummySpout.spoutStreamId, new Fields("id")); - topology.increaseNumberOfTasks(3); - Assert.assertEquals(3, topology.getNumberOfTasks()); + FlinkTopology.createTopology(builder); + } - topology.increaseNumberOfTasks(2); - Assert.assertEquals(5, topology.getNumberOfTasks()); + @Test + @Ignore --- End diff -- ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3055) ExecutionVertex has duplicate method getParallelSubtaskIndex and getSubTaskIndex
[ https://issues.apache.org/jira/browse/FLINK-3055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028522#comment-15028522 ] Robert Metzger commented on FLINK-3055: --- I don't know why I added a second getter for this. Most likely I just oversaw the getParallelSubtaskIndex. Since getParallelSubtaskIndex is used in so many places, I would probably keep this one. > ExecutionVertex has duplicate method getParallelSubtaskIndex and > getSubTaskIndex > > > Key: FLINK-3055 > URL: https://issues.apache.org/jira/browse/FLINK-3055 > Project: Flink > Issue Type: Improvement > Components: Distributed Runtime >Affects Versions: 0.10.0 >Reporter: Ufuk Celebi >Assignee: jun aoki >Priority: Trivial > > In {{ExecutionVertex}}: > {code} > public int getSubTaskIndex() { > return subTaskIndex; > } > public int getParallelSubtaskIndex() { > return this.subTaskIndex; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams
[ https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028525#comment-15028525 ] ASF GitHub Bot commented on FLINK-2837: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964692 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java --- @@ -14,50 +14,70 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.storm.api; -import org.apache.flink.storm.api.FlinkTopology; -import org.junit.Assert; + +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import org.apache.flink.storm.util.TestDummyBolt; +import org.apache.flink.storm.util.TestDummySpout; +import org.apache.flink.storm.util.TestSink; +import org.junit.Ignore; import org.junit.Test; public class FlinkTopologyTest { - @Test - public void testDefaultParallelism() { - final FlinkTopology topology = new FlinkTopology(); - Assert.assertEquals(1, topology.getParallelism()); + @Test(expected = RuntimeException.class) + public void testUnknowSpout() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt", new TestBolt()).shuffleGrouping("unknown"); + + FlinkTopology.createTopology(builder); } - @Test(expected = UnsupportedOperationException.class) - public void testExecute() throws Exception { - new FlinkTopology().execute(); + @Test(expected = RuntimeException.class) + public void testUnknowBolt() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt1", new TestBolt()).shuffleGrouping("spout"); + builder.setBolt("bolt2", new TestBolt()).shuffleGrouping("unknown"); + + FlinkTopology.createTopology(builder); } - @Test(expected = UnsupportedOperationException.class) - public void testExecuteWithName() throws Exception { - new FlinkTopology().execute(null); + @Test(expected = RuntimeException.class) + public void testUndeclaredStream() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt", new TestBolt()).shuffleGrouping("spout"); + + FlinkTopology.createTopology(builder); } @Test - public void testNumberOfTasks() { - final FlinkTopology topology = new FlinkTopology(); + @Ignore + public void testFieldsGroupingOnMultipleSpoutOutputStreams() { + TopologyBuilder builder = new TopologyBuilder(); - Assert.assertEquals(0, topology.getNumberOfTasks()); + builder.setSpout("spout", new TestDummySpout()); + builder.setBolt("sink", new TestSink()).fieldsGrouping("spout", + TestDummySpout.spoutStreamId, new Fields("id")); - topology.increaseNumberOfTasks(3); - Assert.assertEquals(3, topology.getNumberOfTasks()); + FlinkTopology.createTopology(builder); + } - topology.increaseNumberOfTasks(2); - Assert.assertEquals(5, topology.getNumberOfTasks()); + @Test + @Ignore --- End diff -- ok > FlinkTopologyBuilder cannot handle multiple input streams > - > > Key: FLINK-2837 > URL: https://issues.apache.org/jira/browse/FLINK-2837 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Maximilian Michels > > FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead > of union the incoming streams, it replicates the consuming bolt and each > (logical) instance processes one of the input streams. > For example: > {noformat} > final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); > builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10)); > builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8)); > builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13)); > builder.setBolt(boltId, new MergerBolt()) > .shuffleGrouping(spoutId1) > .shuffleGrouping(spoutId2) > .shuffleGrouping(spoutId3); > builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter())) > .shuffleGrouping(boltId); >
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964776 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java --- @@ -16,14 +16,14 @@ */ package org.apache.flink.storm.api; -import java.util.Map; - import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; +import java.util.Map; + public class TestBolt implements IRichBolt { --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964866 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java --- @@ -26,6 +24,8 @@ import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; +import java.util.Map; + public class TestDummySpout implements IRichSpout { --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964832 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java --- @@ -16,13 +16,13 @@ */ package org.apache.flink.storm.api; -import java.util.Map; - import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; +import java.util.Map; + public class TestSpout implements IRichSpout { private static final long serialVersionUID = -4884029383198924007L; --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams
[ https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028527#comment-15028527 ] ASF GitHub Bot commented on FLINK-2837: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964776 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java --- @@ -16,14 +16,14 @@ */ package org.apache.flink.storm.api; -import java.util.Map; - import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; +import java.util.Map; + public class TestBolt implements IRichBolt { --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. > FlinkTopologyBuilder cannot handle multiple input streams > - > > Key: FLINK-2837 > URL: https://issues.apache.org/jira/browse/FLINK-2837 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Maximilian Michels > > FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead > of union the incoming streams, it replicates the consuming bolt and each > (logical) instance processes one of the input streams. > For example: > {noformat} > final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); > builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10)); > builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8)); > builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13)); > builder.setBolt(boltId, new MergerBolt()) > .shuffleGrouping(spoutId1) > .shuffleGrouping(spoutId2) > .shuffleGrouping(spoutId3); > builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter())) > .shuffleGrouping(boltId); > {noformat} > will only print the data from a single source instead of all sources. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964853 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java --- @@ -26,6 +24,8 @@ import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; +import java.util.Map; + public class TestDummyBolt implements IRichBolt { --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964876 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java --- @@ -16,16 +16,16 @@ */ package org.apache.flink.storm.util; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + public class TestSink implements IRichBolt { --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams
[ https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028529#comment-15028529 ] ASF GitHub Bot commented on FLINK-2837: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964853 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java --- @@ -26,6 +24,8 @@ import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; +import java.util.Map; + public class TestDummyBolt implements IRichBolt { --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. > FlinkTopologyBuilder cannot handle multiple input streams > - > > Key: FLINK-2837 > URL: https://issues.apache.org/jira/browse/FLINK-2837 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Maximilian Michels > > FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead > of union the incoming streams, it replicates the consuming bolt and each > (logical) instance processes one of the input streams. > For example: > {noformat} > final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); > builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10)); > builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8)); > builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13)); > builder.setBolt(boltId, new MergerBolt()) > .shuffleGrouping(spoutId1) > .shuffleGrouping(spoutId2) > .shuffleGrouping(spoutId3); > builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter())) > .shuffleGrouping(boltId); > {noformat} > will only print the data from a single source instead of all sources. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams
[ https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028530#comment-15028530 ] ASF GitHub Bot commented on FLINK-2837: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964866 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java --- @@ -26,6 +24,8 @@ import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; +import java.util.Map; + public class TestDummySpout implements IRichSpout { --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. > FlinkTopologyBuilder cannot handle multiple input streams > - > > Key: FLINK-2837 > URL: https://issues.apache.org/jira/browse/FLINK-2837 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Maximilian Michels > > FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead > of union the incoming streams, it replicates the consuming bolt and each > (logical) instance processes one of the input streams. > For example: > {noformat} > final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); > builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10)); > builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8)); > builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13)); > builder.setBolt(boltId, new MergerBolt()) > .shuffleGrouping(spoutId1) > .shuffleGrouping(spoutId2) > .shuffleGrouping(spoutId3); > builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter())) > .shuffleGrouping(boltId); > {noformat} > will only print the data from a single source instead of all sources. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams
[ https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028532#comment-15028532 ] ASF GitHub Bot commented on FLINK-2837: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45964904 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java --- @@ -21,7 +21,6 @@ import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.tuple.Fields; - import org.apache.flink.api.common.ExecutionConfig; --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. > FlinkTopologyBuilder cannot handle multiple input streams > - > > Key: FLINK-2837 > URL: https://issues.apache.org/jira/browse/FLINK-2837 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Maximilian Michels > > FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead > of union the incoming streams, it replicates the consuming bolt and each > (logical) instance processes one of the input streams. > For example: > {noformat} > final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); > builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10)); > builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8)); > builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13)); > builder.setBolt(boltId, new MergerBolt()) > .shuffleGrouping(spoutId1) > .shuffleGrouping(spoutId2) > .shuffleGrouping(spoutId3); > builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter())) > .shuffleGrouping(boltId); > {noformat} > will only print the data from a single source instead of all sources. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45965316 --- Diff: flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java --- @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.print; + +import backtype.storm.Config; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.utils.Utils; +import org.apache.flink.storm.api.FlinkLocalCluster; +import org.apache.flink.storm.api.FlinkTopology; +import storm.starter.bolt.PrinterBolt; +import storm.starter.spout.TwitterSampleSpout; + +import java.util.Arrays; + +/** + * Prints incoming tweets. Tweets can be filtered by keywords. + */ +public class PrintSampleStream { + public static void main(String[] args) throws Exception { --- End diff -- I cannot see why it did not work before? Can you explain what the problem was? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams
[ https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028535#comment-15028535 ] ASF GitHub Bot commented on FLINK-2837: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45965316 --- Diff: flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java --- @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.print; + +import backtype.storm.Config; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.utils.Utils; +import org.apache.flink.storm.api.FlinkLocalCluster; +import org.apache.flink.storm.api.FlinkTopology; +import storm.starter.bolt.PrinterBolt; +import storm.starter.spout.TwitterSampleSpout; + +import java.util.Arrays; + +/** + * Prints incoming tweets. Tweets can be filtered by keywords. + */ +public class PrintSampleStream { + public static void main(String[] args) throws Exception { --- End diff -- I cannot see why it did not work before? Can you explain what the problem was? > FlinkTopologyBuilder cannot handle multiple input streams > - > > Key: FLINK-2837 > URL: https://issues.apache.org/jira/browse/FLINK-2837 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Maximilian Michels > > FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead > of union the incoming streams, it replicates the consuming bolt and each > (logical) instance processes one of the input streams. > For example: > {noformat} > final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); > builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10)); > builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8)); > builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13)); > builder.setBolt(boltId, new MergerBolt()) > .shuffleGrouping(spoutId1) > .shuffleGrouping(spoutId2) > .shuffleGrouping(spoutId3); > builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter())) > .shuffleGrouping(boltId); > {noformat} > will only print the data from a single source instead of all sources. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3051) Define a maximum number of concurrent inflight checkpoints
[ https://issues.apache.org/jira/browse/FLINK-3051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028538#comment-15028538 ] ASF GitHub Bot commented on FLINK-3051: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1408#issuecomment-159876995 Will do this in a separate pull request as follow up. Any concerns about merging this? > Define a maximum number of concurrent inflight checkpoints > -- > > Key: FLINK-3051 > URL: https://issues.apache.org/jira/browse/FLINK-3051 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.0.0 > > > The checkpoint coordinator should define an option to limit the maximum > number of current inflight checkpoints, as well as the checkpoint timeouts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45965523 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java --- @@ -27,13 +27,12 @@ import backtype.storm.state.ISubscribedState; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Fields; +import clojure.lang.Atom; --- End diff -- I see. DataArtians committer can do any change, but external committers get bullied if they apply similar changes... It is not against you or the change itself -- it unifies the style which does make sense. But I got bullied multiple times in other PRs when I did similar stuff... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams
[ https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028537#comment-15028537 ] ASF GitHub Bot commented on FLINK-2837: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45965523 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java --- @@ -27,13 +27,12 @@ import backtype.storm.state.ISubscribedState; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Fields; +import clojure.lang.Atom; --- End diff -- I see. DataArtians committer can do any change, but external committers get bullied if they apply similar changes... It is not against you or the change itself -- it unifies the style which does make sense. But I got bullied multiple times in other PRs when I did similar stuff... > FlinkTopologyBuilder cannot handle multiple input streams > - > > Key: FLINK-2837 > URL: https://issues.apache.org/jira/browse/FLINK-2837 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Maximilian Michels > > FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead > of union the incoming streams, it replicates the consuming bolt and each > (logical) instance processes one of the input streams. > For example: > {noformat} > final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); > builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10)); > builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8)); > builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13)); > builder.setBolt(boltId, new MergerBolt()) > .shuffleGrouping(spoutId1) > .shuffleGrouping(spoutId2) > .shuffleGrouping(spoutId3); > builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter())) > .shuffleGrouping(boltId); > {noformat} > will only print the data from a single source instead of all sources. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45966621 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java --- @@ -20,11 +20,9 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; - --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams
[ https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028551#comment-15028551 ] ASF GitHub Bot commented on FLINK-2837: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45966621 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java --- @@ -20,11 +20,9 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; - --- End diff -- It follows the import style of the other classes, so I'll leave this as it is. > FlinkTopologyBuilder cannot handle multiple input streams > - > > Key: FLINK-2837 > URL: https://issues.apache.org/jira/browse/FLINK-2837 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Maximilian Michels > > FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead > of union the incoming streams, it replicates the consuming bolt and each > (logical) instance processes one of the input streams. > For example: > {noformat} > final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); > builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10)); > builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8)); > builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13)); > builder.setBolt(boltId, new MergerBolt()) > .shuffleGrouping(spoutId1) > .shuffleGrouping(spoutId2) > .shuffleGrouping(spoutId3); > builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter())) > .shuffleGrouping(boltId); > {noformat} > will only print the data from a single source instead of all sources. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45967336 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java --- @@ -27,13 +27,12 @@ import backtype.storm.state.ISubscribedState; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Fields; +import clojure.lang.Atom; --- End diff -- Not sure who is bullying whom :) Look at the classes and you will see that all imports are arranged like this. We want to be consistent, right? According to your suggestion, I changed the other import statements which were just reformatting. Open source is often about compromises. Very rarely you will find that the code style of a person reflects exactly how you would do it. I'm making compromises and changing things as you like them. That's fine for me. Please don't give me a harder time by blaming my employer. I'm not aware I have done something like this to you. Next time you get blamed for something like this, please contact me and I'll try to help you. I don't think this is the right place to sort out things. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3081) Kafka Periodic Offset Committer does not properly terminate on canceling
Stephan Ewen created FLINK-3081: --- Summary: Kafka Periodic Offset Committer does not properly terminate on canceling Key: FLINK-3081 URL: https://issues.apache.org/jira/browse/FLINK-3081 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 0.10.1 Reporter: Stephan Ewen Priority: Blocker Fix For: 1.0.0, 0.10.2 The committer is only stopped at the end of the run method. Any termination of the run method via an exception keeps the periodic committer thread running. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2954) Not able to pass custom environment variables in cluster to processes that spawning TaskManager
[ https://issues.apache.org/jira/browse/FLINK-2954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028613#comment-15028613 ] ASF GitHub Bot commented on FLINK-2954: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1409#discussion_r45970338 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java --- @@ -243,6 +243,20 @@ */ public static final String YARN_PROPERTIES_FILE_LOCATION = "yarn.properties-file.location"; + /** +* Prefix for passing custom environment variables to Flink's ApplicationMaster (JobManager). +* For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set: +* yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" +* in the flink-conf.yaml. +*/ + public static final String YARN_APPLICATION_MASTER_ENV_PREFIX = "yarn.application-master.env."; + + /** +* Similar to the {@see YARN_APPLICATION_MASTER_ENV_PREFIX}, this configuration prefix allows +* setting custom environment variables. +*/ + public static final String YARN_TASK_MANAGER_ENV_PREFIX = "yarn.taskmanager.env."; --- End diff -- I wonder about the naming here. Maybe this should be `YARN_RESOURCE_MANAGER_ENV_PREFIX`? Or change `YARN_APPLICATION_MASTER_ENV_PREFIX` to `YARN_JOB_MANAGER_ENV_PREFIX`? > Not able to pass custom environment variables in cluster to processes that > spawning TaskManager > --- > > Key: FLINK-2954 > URL: https://issues.apache.org/jira/browse/FLINK-2954 > Project: Flink > Issue Type: Bug > Components: Command-line client, Distributed Runtime >Affects Versions: 0.10.0 >Reporter: Jian Jiang >Assignee: Robert Metzger >Priority: Critical > Fix For: 1.0.0 > > > There are programs that rely on custom environment variables. In hadoop > mapreduce job we can use -Dmapreduce.map.env and - Dmapreduce.reduce.env to > do pass them. Similarly in Spark > we can use --conf 'spark.executor.XXX=value for XXX'. There is no such > feature yet in Flink. > This has given Flink a serious disadvantage when customers need such feature. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2954] Add config parameter for passing ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1409#discussion_r45970372 --- Diff: flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java --- @@ -97,6 +98,18 @@ public void tooMuchCutoff() { Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); } + @Test + public void testGetEnvironmentVariables() { + Configuration testConf = new Configuration(); + testConf.setString("yarn.application-master.env.LD_LIBRARY_PATH", "/usr/lib/native"); + + Mapres = Utils.getEnvironmentVariables("yarn.application-master.env.", testConf); + + Assert.assertEquals(1, res.size()); + Map.Entry entry = res.entrySet().iterator().next(); + Assert.assertEquals("LD_LIBRARY_PATH", entry.getKey()); + Assert.assertEquals("/usr/lib/native", entry.getValue()); + } --- End diff -- There is no test for the task manager variables. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2954) Not able to pass custom environment variables in cluster to processes that spawning TaskManager
[ https://issues.apache.org/jira/browse/FLINK-2954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028615#comment-15028615 ] ASF GitHub Bot commented on FLINK-2954: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1409#discussion_r45970372 --- Diff: flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java --- @@ -97,6 +98,18 @@ public void tooMuchCutoff() { Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); } + @Test + public void testGetEnvironmentVariables() { + Configuration testConf = new Configuration(); + testConf.setString("yarn.application-master.env.LD_LIBRARY_PATH", "/usr/lib/native"); + + Mapres = Utils.getEnvironmentVariables("yarn.application-master.env.", testConf); + + Assert.assertEquals(1, res.size()); + Map.Entry entry = res.entrySet().iterator().next(); + Assert.assertEquals("LD_LIBRARY_PATH", entry.getKey()); + Assert.assertEquals("/usr/lib/native", entry.getValue()); + } --- End diff -- There is no test for the task manager variables. > Not able to pass custom environment variables in cluster to processes that > spawning TaskManager > --- > > Key: FLINK-2954 > URL: https://issues.apache.org/jira/browse/FLINK-2954 > Project: Flink > Issue Type: Bug > Components: Command-line client, Distributed Runtime >Affects Versions: 0.10.0 >Reporter: Jian Jiang >Assignee: Robert Metzger >Priority: Critical > Fix For: 1.0.0 > > > There are programs that rely on custom environment variables. In hadoop > mapreduce job we can use -Dmapreduce.map.env and - Dmapreduce.reduce.env to > do pass them. Similarly in Spark > we can use --conf 'spark.executor.XXX=value for XXX'. There is no such > feature yet in Flink. > This has given Flink a serious disadvantage when customers need such feature. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45970389 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java --- @@ -27,13 +27,12 @@ import backtype.storm.state.ISubscribedState; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Fields; +import clojure.lang.Atom; --- End diff -- Good we are on the same page. And I don't want to bully you! I just mentioned the classes that do not contain any actual code change -- actually, according to the coding guidelines -- there should be no import-order changes even in the classes with code changes -- I did not comment on them -- just on the classes with pure reformatting. I like consistency so please apply the changes to all classes. But I did import-reorderings or making code formatting consistent (if it was inconsistent) and was always told "don't do this". So if it is a general rule, I just point it out here, too. I did not come up with the rule. And I never force my own code style -- a always adapt to the given style. :) It's is really about time to get a proper maven formatting tool running to get rid of all this stupid discussions. (And a said already: "It is not against you or the change itself" -- but the process seems to be inconsistent -- people follow the rules more or less strictly) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams
[ https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028617#comment-15028617 ] ASF GitHub Bot commented on FLINK-2837: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45970389 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java --- @@ -27,13 +27,12 @@ import backtype.storm.state.ISubscribedState; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Fields; +import clojure.lang.Atom; --- End diff -- Good we are on the same page. And I don't want to bully you! I just mentioned the classes that do not contain any actual code change -- actually, according to the coding guidelines -- there should be no import-order changes even in the classes with code changes -- I did not comment on them -- just on the classes with pure reformatting. I like consistency so please apply the changes to all classes. But I did import-reorderings or making code formatting consistent (if it was inconsistent) and was always told "don't do this". So if it is a general rule, I just point it out here, too. I did not come up with the rule. And I never force my own code style -- a always adapt to the given style. :) It's is really about time to get a proper maven formatting tool running to get rid of all this stupid discussions. (And a said already: "It is not against you or the change itself" -- but the process seems to be inconsistent -- people follow the rules more or less strictly) > FlinkTopologyBuilder cannot handle multiple input streams > - > > Key: FLINK-2837 > URL: https://issues.apache.org/jira/browse/FLINK-2837 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Maximilian Michels > > FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead > of union the incoming streams, it replicates the consuming bolt and each > (logical) instance processes one of the input streams. > For example: > {noformat} > final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); > builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10)); > builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8)); > builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13)); > builder.setBolt(boltId, new MergerBolt()) > .shuffleGrouping(spoutId1) > .shuffleGrouping(spoutId2) > .shuffleGrouping(spoutId3); > builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter())) > .shuffleGrouping(boltId); > {noformat} > will only print the data from a single source instead of all sources. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2954] Add config parameter for passing ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1409#discussion_r45970591 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java --- @@ -221,4 +210,22 @@ public static void addToEnvironment(Mapenvironment, private Utils() { throw new RuntimeException(); } + + /** +* Method to extract environment variables from the flinkConfiguration based on the given prefix String. +* +* @param envPrefix Prefix for the environment variables key +* @param flinkConfiguration The Flink config to get the environment variable defintion from +*/ + public static Map getEnvironmentVariables(String envPrefix, org.apache.flink.configuration.Configuration flinkConfiguration) { + Map result = new HashMap<>(); + for(Map.Entry entry: flinkConfiguration.toMap().entrySet()) { + if(entry.getKey().startsWith(envPrefix)) { + // remove prefix + String key = entry.getKey().substring(envPrefix.length()); --- End diff -- What happens if the key is `envPrefix`? There is little validation here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2954) Not able to pass custom environment variables in cluster to processes that spawning TaskManager
[ https://issues.apache.org/jira/browse/FLINK-2954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028625#comment-15028625 ] ASF GitHub Bot commented on FLINK-2954: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1409#discussion_r45970591 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java --- @@ -221,4 +210,22 @@ public static void addToEnvironment(Mapenvironment, private Utils() { throw new RuntimeException(); } + + /** +* Method to extract environment variables from the flinkConfiguration based on the given prefix String. +* +* @param envPrefix Prefix for the environment variables key +* @param flinkConfiguration The Flink config to get the environment variable defintion from +*/ + public static Map getEnvironmentVariables(String envPrefix, org.apache.flink.configuration.Configuration flinkConfiguration) { + Map result = new HashMap<>(); + for(Map.Entry entry: flinkConfiguration.toMap().entrySet()) { + if(entry.getKey().startsWith(envPrefix)) { + // remove prefix + String key = entry.getKey().substring(envPrefix.length()); --- End diff -- What happens if the key is `envPrefix`? There is little validation here. > Not able to pass custom environment variables in cluster to processes that > spawning TaskManager > --- > > Key: FLINK-2954 > URL: https://issues.apache.org/jira/browse/FLINK-2954 > Project: Flink > Issue Type: Bug > Components: Command-line client, Distributed Runtime >Affects Versions: 0.10.0 >Reporter: Jian Jiang >Assignee: Robert Metzger >Priority: Critical > Fix For: 1.0.0 > > > There are programs that rely on custom environment variables. In hadoop > mapreduce job we can use -Dmapreduce.map.env and - Dmapreduce.reduce.env to > do pass them. Similarly in Spark > we can use --conf 'spark.executor.XXX=value for XXX'. There is no such > feature yet in Flink. > This has given Flink a serious disadvantage when customers need such feature. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2954] Add config parameter for passing ...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/1409#issuecomment-159893248 Looks good except for some minor issues. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2954) Not able to pass custom environment variables in cluster to processes that spawning TaskManager
[ https://issues.apache.org/jira/browse/FLINK-2954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028627#comment-15028627 ] ASF GitHub Bot commented on FLINK-2954: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/1409#issuecomment-159893248 Looks good except for some minor issues. > Not able to pass custom environment variables in cluster to processes that > spawning TaskManager > --- > > Key: FLINK-2954 > URL: https://issues.apache.org/jira/browse/FLINK-2954 > Project: Flink > Issue Type: Bug > Components: Command-line client, Distributed Runtime >Affects Versions: 0.10.0 >Reporter: Jian Jiang >Assignee: Robert Metzger >Priority: Critical > Fix For: 1.0.0 > > > There are programs that rely on custom environment variables. In hadoop > mapreduce job we can use -Dmapreduce.map.env and - Dmapreduce.reduce.env to > do pass them. Similarly in Spark > we can use --conf 'spark.executor.XXX=value for XXX'. There is no such > feature yet in Flink. > This has given Flink a serious disadvantage when customers need such feature. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams
[ https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028628#comment-15028628 ] ASF GitHub Bot commented on FLINK-2837: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45970656 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java --- @@ -44,16 +45,30 @@ /** The schema (ie, ordered field names) of the tuple */ private final Fields schema; + private final int taskId; + private final String producerStreamId; + private final MessageId id; + private final String producerComponentId; + + + /** +* Constructor which sets defaults for producerComponentId, taskId, and componentID +* @param flinkTuple the Flink tuple +* @param schema The schema of the storm fields +*/ + StormTuple(final IN flinkTuple, final Fields schema) { + this(flinkTuple, schema, -1, "testStream", "componentID"); + } --- End diff -- Ok. I guess it would make sense to use Storm's `Utils.DEFAULT_STREAM_ID` here? And maybe add a `public final static String DEFAULT_OPERATOR_ID` variable to `StormTuple`? What about using "defaultID" or "unspecified" instead of "componentID" or similar? Just to make it clear if the name shows up in the UI? > FlinkTopologyBuilder cannot handle multiple input streams > - > > Key: FLINK-2837 > URL: https://issues.apache.org/jira/browse/FLINK-2837 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Maximilian Michels > > FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead > of union the incoming streams, it replicates the consuming bolt and each > (logical) instance processes one of the input streams. > For example: > {noformat} > final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); > builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10)); > builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8)); > builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13)); > builder.setBolt(boltId, new MergerBolt()) > .shuffleGrouping(spoutId1) > .shuffleGrouping(spoutId2) > .shuffleGrouping(spoutId3); > builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter())) > .shuffleGrouping(boltId); > {noformat} > will only print the data from a single source instead of all sources. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3046] Integrate the Either Java type wi...
Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1393#issuecomment-159893451 Thanks for testing my PR. Sorry for the bug. I forgot to test the most obvious case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3046) Integrate the Either Java type with the TypeExtractor
[ https://issues.apache.org/jira/browse/FLINK-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028630#comment-15028630 ] ASF GitHub Bot commented on FLINK-3046: --- Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1393#issuecomment-159893451 Thanks for testing my PR. Sorry for the bug. I forgot to test the most obvious case. > Integrate the Either Java type with the TypeExtractor > - > > Key: FLINK-3046 > URL: https://issues.apache.org/jira/browse/FLINK-3046 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Affects Versions: 1.0.0 >Reporter: Vasia Kalavri >Assignee: Timo Walther > > Integrate the Either Java type with the TypeExtractor, so that the APIs > recognize the type and choose the type info properly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45971476 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java --- @@ -44,16 +45,30 @@ /** The schema (ie, ordered field names) of the tuple */ private final Fields schema; + private final int taskId; + private final String producerStreamId; + private final MessageId id; + private final String producerComponentId; + + + /** +* Constructor which sets defaults for producerComponentId, taskId, and componentID +* @param flinkTuple the Flink tuple +* @param schema The schema of the storm fields +*/ + StormTuple(final IN flinkTuple, final Fields schema) { + this(flinkTuple, schema, -1, "testStream", "componentID"); + } --- End diff -- Fair enough, I use default id constants now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams
[ https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028638#comment-15028638 ] ASF GitHub Bot commented on FLINK-2837: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45971476 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java --- @@ -44,16 +45,30 @@ /** The schema (ie, ordered field names) of the tuple */ private final Fields schema; + private final int taskId; + private final String producerStreamId; + private final MessageId id; + private final String producerComponentId; + + + /** +* Constructor which sets defaults for producerComponentId, taskId, and componentID +* @param flinkTuple the Flink tuple +* @param schema The schema of the storm fields +*/ + StormTuple(final IN flinkTuple, final Fields schema) { + this(flinkTuple, schema, -1, "testStream", "componentID"); + } --- End diff -- Fair enough, I use default id constants now. > FlinkTopologyBuilder cannot handle multiple input streams > - > > Key: FLINK-2837 > URL: https://issues.apache.org/jira/browse/FLINK-2837 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Maximilian Michels > > FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead > of union the incoming streams, it replicates the consuming bolt and each > (logical) instance processes one of the input streams. > For example: > {noformat} > final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); > builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10)); > builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8)); > builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13)); > builder.setBolt(boltId, new MergerBolt()) > .shuffleGrouping(spoutId1) > .shuffleGrouping(spoutId2) > .shuffleGrouping(spoutId3); > builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter())) > .shuffleGrouping(boltId); > {noformat} > will only print the data from a single source instead of all sources. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2837][storm] various improvements for S...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45971856 --- Diff: flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java --- @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.print; + +import backtype.storm.Config; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.utils.Utils; +import org.apache.flink.storm.api.FlinkLocalCluster; +import org.apache.flink.storm.api.FlinkTopology; +import storm.starter.bolt.PrinterBolt; +import storm.starter.spout.TwitterSampleSpout; + +import java.util.Arrays; + +/** + * Prints incoming tweets. Tweets can be filtered by keywords. + */ +public class PrintSampleStream { + public static void main(String[] args) throws Exception { --- End diff -- The problem was that the `BoltWrapper` wouldn't create a `BoltCollector` if the bolt didn't define any output fields. That led to a NullPointerException. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2837) FlinkTopologyBuilder cannot handle multiple input streams
[ https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028646#comment-15028646 ] ASF GitHub Bot commented on FLINK-2837: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1398#discussion_r45971856 --- Diff: flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java --- @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.print; + +import backtype.storm.Config; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.utils.Utils; +import org.apache.flink.storm.api.FlinkLocalCluster; +import org.apache.flink.storm.api.FlinkTopology; +import storm.starter.bolt.PrinterBolt; +import storm.starter.spout.TwitterSampleSpout; + +import java.util.Arrays; + +/** + * Prints incoming tweets. Tweets can be filtered by keywords. + */ +public class PrintSampleStream { + public static void main(String[] args) throws Exception { --- End diff -- The problem was that the `BoltWrapper` wouldn't create a `BoltCollector` if the bolt didn't define any output fields. That led to a NullPointerException. > FlinkTopologyBuilder cannot handle multiple input streams > - > > Key: FLINK-2837 > URL: https://issues.apache.org/jira/browse/FLINK-2837 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Maximilian Michels > > FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead > of union the incoming streams, it replicates the consuming bolt and each > (logical) instance processes one of the input streams. > For example: > {noformat} > final FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); > builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10)); > builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8)); > builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13)); > builder.setBolt(boltId, new MergerBolt()) > .shuffleGrouping(spoutId1) > .shuffleGrouping(spoutId2) > .shuffleGrouping(spoutId3); > builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter())) > .shuffleGrouping(boltId); > {noformat} > will only print the data from a single source instead of all sources. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2954) Not able to pass custom environment variables in cluster to processes that spawning TaskManager
[ https://issues.apache.org/jira/browse/FLINK-2954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028665#comment-15028665 ] ASF GitHub Bot commented on FLINK-2954: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1409#discussion_r45973129 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java --- @@ -243,6 +243,20 @@ */ public static final String YARN_PROPERTIES_FILE_LOCATION = "yarn.properties-file.location"; + /** +* Prefix for passing custom environment variables to Flink's ApplicationMaster (JobManager). +* For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set: +* yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" +* in the flink-conf.yaml. +*/ + public static final String YARN_APPLICATION_MASTER_ENV_PREFIX = "yarn.application-master.env."; + + /** +* Similar to the {@see YARN_APPLICATION_MASTER_ENV_PREFIX}, this configuration prefix allows +* setting custom environment variables. +*/ + public static final String YARN_TASK_MANAGER_ENV_PREFIX = "yarn.taskmanager.env."; --- End diff -- I think `YARN_RESOURCE_MANAGER_ENV_PREFIX` is just wrong bc its a service of YARN, not of Flink (http://hortonworks.com/blog/apache-hadoop-yarn-resourcemanager/). Whether to use AM or JM is a good question. In the YARN context, AM and JM are basically the same. I think AM is a bit more consistent. > Not able to pass custom environment variables in cluster to processes that > spawning TaskManager > --- > > Key: FLINK-2954 > URL: https://issues.apache.org/jira/browse/FLINK-2954 > Project: Flink > Issue Type: Bug > Components: Command-line client, Distributed Runtime >Affects Versions: 0.10.0 >Reporter: Jian Jiang >Assignee: Robert Metzger >Priority: Critical > Fix For: 1.0.0 > > > There are programs that rely on custom environment variables. In hadoop > mapreduce job we can use -Dmapreduce.map.env and - Dmapreduce.reduce.env to > do pass them. Similarly in Spark > we can use --conf 'spark.executor.XXX=value for XXX'. There is no such > feature yet in Flink. > This has given Flink a serious disadvantage when customers need such feature. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2954] Add config parameter for passing ...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1409#discussion_r45973129 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java --- @@ -243,6 +243,20 @@ */ public static final String YARN_PROPERTIES_FILE_LOCATION = "yarn.properties-file.location"; + /** +* Prefix for passing custom environment variables to Flink's ApplicationMaster (JobManager). +* For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set: +* yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" +* in the flink-conf.yaml. +*/ + public static final String YARN_APPLICATION_MASTER_ENV_PREFIX = "yarn.application-master.env."; + + /** +* Similar to the {@see YARN_APPLICATION_MASTER_ENV_PREFIX}, this configuration prefix allows +* setting custom environment variables. +*/ + public static final String YARN_TASK_MANAGER_ENV_PREFIX = "yarn.taskmanager.env."; --- End diff -- I think `YARN_RESOURCE_MANAGER_ENV_PREFIX` is just wrong bc its a service of YARN, not of Flink (http://hortonworks.com/blog/apache-hadoop-yarn-resourcemanager/). Whether to use AM or JM is a good question. In the YARN context, AM and JM are basically the same. I think AM is a bit more consistent. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2954) Not able to pass custom environment variables in cluster to processes that spawning TaskManager
[ https://issues.apache.org/jira/browse/FLINK-2954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028668#comment-15028668 ] ASF GitHub Bot commented on FLINK-2954: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1409#discussion_r45973217 --- Diff: flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java --- @@ -97,6 +98,18 @@ public void tooMuchCutoff() { Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); } + @Test + public void testGetEnvironmentVariables() { + Configuration testConf = new Configuration(); + testConf.setString("yarn.application-master.env.LD_LIBRARY_PATH", "/usr/lib/native"); + + Mapres = Utils.getEnvironmentVariables("yarn.application-master.env.", testConf); + + Assert.assertEquals(1, res.size()); + Map.Entry entry = res.entrySet().iterator().next(); + Assert.assertEquals("LD_LIBRARY_PATH", entry.getKey()); + Assert.assertEquals("/usr/lib/native", entry.getValue()); + } --- End diff -- I think its okay to assume that the method will work for all prefix strings. This test is only about the method for extracting the variables. > Not able to pass custom environment variables in cluster to processes that > spawning TaskManager > --- > > Key: FLINK-2954 > URL: https://issues.apache.org/jira/browse/FLINK-2954 > Project: Flink > Issue Type: Bug > Components: Command-line client, Distributed Runtime >Affects Versions: 0.10.0 >Reporter: Jian Jiang >Assignee: Robert Metzger >Priority: Critical > Fix For: 1.0.0 > > > There are programs that rely on custom environment variables. In hadoop > mapreduce job we can use -Dmapreduce.map.env and - Dmapreduce.reduce.env to > do pass them. Similarly in Spark > we can use --conf 'spark.executor.XXX=value for XXX'. There is no such > feature yet in Flink. > This has given Flink a serious disadvantage when customers need such feature. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2954] Add config parameter for passing ...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1409#discussion_r45973217 --- Diff: flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java --- @@ -97,6 +98,18 @@ public void tooMuchCutoff() { Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); } + @Test + public void testGetEnvironmentVariables() { + Configuration testConf = new Configuration(); + testConf.setString("yarn.application-master.env.LD_LIBRARY_PATH", "/usr/lib/native"); + + Mapres = Utils.getEnvironmentVariables("yarn.application-master.env.", testConf); + + Assert.assertEquals(1, res.size()); + Map.Entry entry = res.entrySet().iterator().next(); + Assert.assertEquals("LD_LIBRARY_PATH", entry.getKey()); + Assert.assertEquals("/usr/lib/native", entry.getValue()); + } --- End diff -- I think its okay to assume that the method will work for all prefix strings. This test is only about the method for extracting the variables. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2954] Add config parameter for passing ...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1409#discussion_r45973237 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java --- @@ -221,4 +210,22 @@ public static void addToEnvironment(Mapenvironment, private Utils() { throw new RuntimeException(); } + + /** +* Method to extract environment variables from the flinkConfiguration based on the given prefix String. +* +* @param envPrefix Prefix for the environment variables key +* @param flinkConfiguration The Flink config to get the environment variable defintion from +*/ + public static Map getEnvironmentVariables(String envPrefix, org.apache.flink.configuration.Configuration flinkConfiguration) { + Map result = new HashMap<>(); + for(Map.Entry entry: flinkConfiguration.toMap().entrySet()) { + if(entry.getKey().startsWith(envPrefix)) { + // remove prefix + String key = entry.getKey().substring(envPrefix.length()); --- End diff -- I agree. I'll harden the method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2954) Not able to pass custom environment variables in cluster to processes that spawning TaskManager
[ https://issues.apache.org/jira/browse/FLINK-2954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028670#comment-15028670 ] ASF GitHub Bot commented on FLINK-2954: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1409#discussion_r45973237 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java --- @@ -221,4 +210,22 @@ public static void addToEnvironment(Mapenvironment, private Utils() { throw new RuntimeException(); } + + /** +* Method to extract environment variables from the flinkConfiguration based on the given prefix String. +* +* @param envPrefix Prefix for the environment variables key +* @param flinkConfiguration The Flink config to get the environment variable defintion from +*/ + public static Map getEnvironmentVariables(String envPrefix, org.apache.flink.configuration.Configuration flinkConfiguration) { + Map result = new HashMap<>(); + for(Map.Entry entry: flinkConfiguration.toMap().entrySet()) { + if(entry.getKey().startsWith(envPrefix)) { + // remove prefix + String key = entry.getKey().substring(envPrefix.length()); --- End diff -- I agree. I'll harden the method. > Not able to pass custom environment variables in cluster to processes that > spawning TaskManager > --- > > Key: FLINK-2954 > URL: https://issues.apache.org/jira/browse/FLINK-2954 > Project: Flink > Issue Type: Bug > Components: Command-line client, Distributed Runtime >Affects Versions: 0.10.0 >Reporter: Jian Jiang >Assignee: Robert Metzger >Priority: Critical > Fix For: 1.0.0 > > > There are programs that rely on custom environment variables. In hadoop > mapreduce job we can use -Dmapreduce.map.env and - Dmapreduce.reduce.env to > do pass them. Similarly in Spark > we can use --conf 'spark.executor.XXX=value for XXX'. There is no such > feature yet in Flink. > This has given Flink a serious disadvantage when customers need such feature. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-3081) Kafka Periodic Offset Committer does not properly terminate on canceling
[ https://issues.apache.org/jira/browse/FLINK-3081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger reassigned FLINK-3081: - Assignee: Robert Metzger > Kafka Periodic Offset Committer does not properly terminate on canceling > > > Key: FLINK-3081 > URL: https://issues.apache.org/jira/browse/FLINK-3081 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 0.10.1 >Reporter: Stephan Ewen >Assignee: Robert Metzger >Priority: Blocker > Fix For: 1.0.0, 0.10.2 > > > The committer is only stopped at the end of the run method. Any termination > of the run method via an exception keeps the periodic committer thread > running. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2954] Add config parameter for passing ...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1409#issuecomment-159907627 Thank you for the good review. I addressed some of your concerns. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2954) Not able to pass custom environment variables in cluster to processes that spawning TaskManager
[ https://issues.apache.org/jira/browse/FLINK-2954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028715#comment-15028715 ] ASF GitHub Bot commented on FLINK-2954: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1409#issuecomment-159907627 Thank you for the good review. I addressed some of your concerns. > Not able to pass custom environment variables in cluster to processes that > spawning TaskManager > --- > > Key: FLINK-2954 > URL: https://issues.apache.org/jira/browse/FLINK-2954 > Project: Flink > Issue Type: Bug > Components: Command-line client, Distributed Runtime >Affects Versions: 0.10.0 >Reporter: Jian Jiang >Assignee: Robert Metzger >Priority: Critical > Fix For: 1.0.0 > > > There are programs that rely on custom environment variables. In hadoop > mapreduce job we can use -Dmapreduce.map.env and - Dmapreduce.reduce.env to > do pass them. Similarly in Spark > we can use --conf 'spark.executor.XXX=value for XXX'. There is no such > feature yet in Flink. > This has given Flink a serious disadvantage when customers need such feature. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3051] Control the maximum number of con...
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1408#issuecomment-159910663 Please give me an hour or so to look at this :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3051) Define a maximum number of concurrent inflight checkpoints
[ https://issues.apache.org/jira/browse/FLINK-3051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028762#comment-15028762 ] ASF GitHub Bot commented on FLINK-3051: --- Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1408#issuecomment-159910663 Please give me an hour or so to look at this :) > Define a maximum number of concurrent inflight checkpoints > -- > > Key: FLINK-3051 > URL: https://issues.apache.org/jira/browse/FLINK-3051 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.0.0 > > > The checkpoint coordinator should define an option to limit the maximum > number of current inflight checkpoints, as well as the checkpoint timeouts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3081) Kafka Periodic Offset Committer does not properly terminate on canceling
[ https://issues.apache.org/jira/browse/FLINK-3081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028781#comment-15028781 ] ASF GitHub Bot commented on FLINK-3081: --- GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/1410 [FLINK-3081] Properly stop periodic Kafka committer You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink flink3081 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1410.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1410 commit 70373b381fbcb5afc3a56f8610766cfe31eb963f Author: Robert MetzgerDate: 2015-11-26T13:25:54Z [FLINK-3081] Properly stop periodic Kafka committer > Kafka Periodic Offset Committer does not properly terminate on canceling > > > Key: FLINK-3081 > URL: https://issues.apache.org/jira/browse/FLINK-3081 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 0.10.1 >Reporter: Stephan Ewen >Assignee: Robert Metzger >Priority: Blocker > Fix For: 1.0.0, 0.10.2 > > > The committer is only stopped at the end of the run method. Any termination > of the run method via an exception keeps the periodic committer thread > running. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3081] Properly stop periodic Kafka comm...
GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/1410 [FLINK-3081] Properly stop periodic Kafka committer You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink flink3081 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1410.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1410 commit 70373b381fbcb5afc3a56f8610766cfe31eb963f Author: Robert MetzgerDate: 2015-11-26T13:25:54Z [FLINK-3081] Properly stop periodic Kafka committer --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3051] Control the maximum number of con...
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1408#issuecomment-159915515 Looks good and the minimum delay between checkpoints would be an extremely useful feature. What's missing for that? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3051) Define a maximum number of concurrent inflight checkpoints
[ https://issues.apache.org/jira/browse/FLINK-3051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028796#comment-15028796 ] ASF GitHub Bot commented on FLINK-3051: --- Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1408#issuecomment-159915515 Looks good and the minimum delay between checkpoints would be an extremely useful feature. What's missing for that? > Define a maximum number of concurrent inflight checkpoints > -- > > Key: FLINK-3051 > URL: https://issues.apache.org/jira/browse/FLINK-3051 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.0.0 > > > The checkpoint coordinator should define an option to limit the maximum > number of current inflight checkpoints, as well as the checkpoint timeouts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3046] Integrate the Either Java type wi...
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1393#issuecomment-159916143 Seems to work now :) :+1: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3046) Integrate the Either Java type with the TypeExtractor
[ https://issues.apache.org/jira/browse/FLINK-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028799#comment-15028799 ] ASF GitHub Bot commented on FLINK-3046: --- Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1393#issuecomment-159916143 Seems to work now :) :+1: > Integrate the Either Java type with the TypeExtractor > - > > Key: FLINK-3046 > URL: https://issues.apache.org/jira/browse/FLINK-3046 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Affects Versions: 1.0.0 >Reporter: Vasia Kalavri >Assignee: Timo Walther > > Integrate the Either Java type with the TypeExtractor, so that the APIs > recognize the type and choose the type info properly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3082) Confusing error about ManualTimestampSourceFunction
Niels Basjes created FLINK-3082: --- Summary: Confusing error about ManualTimestampSourceFunction Key: FLINK-3082 URL: https://issues.apache.org/jira/browse/FLINK-3082 Project: Flink Issue Type: Bug Reporter: Niels Basjes I wrote a source like this: {code} public class Foo extends RichSourceFunction { {code} and then did {code} ctx.collectWithTimestamp(event, event.eventTimestamp); {code} I got this error: {code} Caused by: java.lang.UnsupportedOperationException: Automatic-Timestamp sources cannot emit elements with a timestamp. See interface ManualTimestampSourceFunction if you want to manually assign timestamps to elements. at org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collectWithTimestamp(StreamSource.java:97) {code} After some digging it turns out that {{ManualTimestampSourceFunction}} was renamed to {{EventTimeSourceFunction}} and apparently the old name still lingers in this error message. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3082) Confusing error about ManualTimestampSourceFunction
[ https://issues.apache.org/jira/browse/FLINK-3082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028861#comment-15028861 ] ASF GitHub Bot commented on FLINK-3082: --- GitHub user nielsbasjes opened a pull request: https://github.com/apache/flink/pull/1411 [FLINK-3082] Fixed confusing error about an interface that no longer exists The ManualTimestampSourceFunction interface does not exist. Yet there are error messages that thell you to take a look at it. This simply fixes these error messages. You can merge this pull request into a Git repository by running: $ git pull https://github.com/nielsbasjes/flink FLINK-3082 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1411.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1411 commit 80d69afb855214bb41a1f257049c550dc4ca3859 Author: Niels BasjesDate: 2015-11-26T14:21:04Z [FLINK-3082] Fixed confusing error about an interface that no longer exists > Confusing error about ManualTimestampSourceFunction > --- > > Key: FLINK-3082 > URL: https://issues.apache.org/jira/browse/FLINK-3082 > Project: Flink > Issue Type: Bug >Reporter: Niels Basjes > > I wrote a source like this: > {code} > public class Foo extends RichSourceFunction { > {code} > and then did > {code} > ctx.collectWithTimestamp(event, event.eventTimestamp); > {code} > I got this error: > {code} > Caused by: java.lang.UnsupportedOperationException: Automatic-Timestamp > sources cannot emit elements with a timestamp. See interface > ManualTimestampSourceFunction if you want to manually assign timestamps to > elements. > at > org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collectWithTimestamp(StreamSource.java:97) > {code} > After some digging it turns out that {{ManualTimestampSourceFunction}} was > renamed to {{EventTimeSourceFunction}} and apparently the old name still > lingers in this error message. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3082] Fixed confusing error about an in...
GitHub user nielsbasjes opened a pull request: https://github.com/apache/flink/pull/1411 [FLINK-3082] Fixed confusing error about an interface that no longer exists The ManualTimestampSourceFunction interface does not exist. Yet there are error messages that thell you to take a look at it. This simply fixes these error messages. You can merge this pull request into a Git repository by running: $ git pull https://github.com/nielsbasjes/flink FLINK-3082 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1411.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1411 commit 80d69afb855214bb41a1f257049c550dc4ca3859 Author: Niels BasjesDate: 2015-11-26T14:21:04Z [FLINK-3082] Fixed confusing error about an interface that no longer exists --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3051] Control the maximum number of con...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1408#issuecomment-159932475 Missing for that is a bit of code in the checkpoint coordinator that marks the time when checkpoints become possible again and adds that delay to the time when the next checkpoint is scheduled. I'd suggest to add that as a followup... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3051) Define a maximum number of concurrent inflight checkpoints
[ https://issues.apache.org/jira/browse/FLINK-3051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028931#comment-15028931 ] ASF GitHub Bot commented on FLINK-3051: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1408#issuecomment-159932475 Missing for that is a bit of code in the checkpoint coordinator that marks the time when checkpoints become possible again and adds that delay to the time when the next checkpoint is scheduled. I'd suggest to add that as a followup... > Define a maximum number of concurrent inflight checkpoints > -- > > Key: FLINK-3051 > URL: https://issues.apache.org/jira/browse/FLINK-3051 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.0.0 > > > The checkpoint coordinator should define an option to limit the maximum > number of current inflight checkpoints, as well as the checkpoint timeouts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3081] Properly stop periodic Kafka comm...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1410#discussion_r45985979 --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java --- @@ -415,13 +415,17 @@ public void run(SourceContext sourceContext) throws Exception { long commitInterval = Long.valueOf(props.getProperty("auto.commit.interval.ms", "6")); offsetCommitter = new PeriodicOffsetCommitter(commitInterval, this); offsetCommitter.start(); + offsetCommitter.setDaemon(true); --- End diff -- I think that has to be before `start()` (otherwise it fails) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3081] Properly stop periodic Kafka comm...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1410#issuecomment-159933351 One small comment, otherwise looks good... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3081) Kafka Periodic Offset Committer does not properly terminate on canceling
[ https://issues.apache.org/jira/browse/FLINK-3081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15028947#comment-15028947 ] ASF GitHub Bot commented on FLINK-3081: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1410#discussion_r45985979 --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java --- @@ -415,13 +415,17 @@ public void run(SourceContext sourceContext) throws Exception { long commitInterval = Long.valueOf(props.getProperty("auto.commit.interval.ms", "6")); offsetCommitter = new PeriodicOffsetCommitter(commitInterval, this); offsetCommitter.start(); + offsetCommitter.setDaemon(true); --- End diff -- I think that has to be before `start()` (otherwise it fails) > Kafka Periodic Offset Committer does not properly terminate on canceling > > > Key: FLINK-3081 > URL: https://issues.apache.org/jira/browse/FLINK-3081 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 0.10.1 >Reporter: Stephan Ewen >Assignee: Robert Metzger >Priority: Blocker > Fix For: 1.0.0, 0.10.2 > > > The committer is only stopped at the end of the run method. Any termination > of the run method via an exception keeps the periodic committer thread > running. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2522] Adds streaming support for Flink-...
GitHub user nikste opened a pull request: https://github.com/apache/flink/pull/1412 [FLINK-2522] Adds streaming support for Flink-Scala-Shell Adds streaming collect support for scala You can merge this pull request into a Git repository by running: $ git pull https://github.com/nikste/flink Flink-2522_Scala_shell_streaming Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1412.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1412 commit b736c2a6fb5cdc487f6389f58b2b6cf969b96991 Author: Nikolaas SteenbergenDate: 2015-08-12T08:42:59Z Adds streaming support for Flink-Scala-Shell Adds streaming collect support for scala --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3083) Add docs how to configure streaming fault tolerance
[ https://issues.apache.org/jira/browse/FLINK-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15029059#comment-15029059 ] ASF GitHub Bot commented on FLINK-3083: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/1413 [FLINK-3083] [docs] Add docs on how to configure streaming fault toleance This documents the features of #1408 You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink ft_docs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1413.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1413 > Add docs how to configure streaming fault tolerance > --- > > Key: FLINK-3083 > URL: https://issues.apache.org/jira/browse/FLINK-3083 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.0.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3083] [docs] Add docs on how to configu...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/1413 [FLINK-3083] [docs] Add docs on how to configure streaming fault toleance This documents the features of #1408 You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink ft_docs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1413.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1413 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3084) File State Backend should not write very small state into files
Stephan Ewen created FLINK-3084: --- Summary: File State Backend should not write very small state into files Key: FLINK-3084 URL: https://issues.apache.org/jira/browse/FLINK-3084 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10.0 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.0.0 Currently, the {{FsStateBackend}} writes all state into files. Some state (like Kafka Offsets) is so small that it adds unnecessary overhead, and sometimes the checkpointed file handles are larger than the actual state. Small state (below a certain threshold, say 1 KB) should not be stored in files, but directly in the state handles. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3085) Move State Backend Initialization from "registerInputOutput()" to "invoke()"
Stephan Ewen created FLINK-3085: --- Summary: Move State Backend Initialization from "registerInputOutput()" to "invoke()" Key: FLINK-3085 URL: https://issues.apache.org/jira/browse/FLINK-3085 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10.0 Reporter: Stephan Ewen Fix For: 1.0.0 The state backend initialization currently happens in the {{StreamTask}} in {{registerInputOutput()}}. For better error handling, it should be part of {{invoke()}}, where the task is properly interrupted, threads are properly joined, and exceptions are treated aware of cancelling. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3082) Confusing error about ManualTimestampSourceFunction
[ https://issues.apache.org/jira/browse/FLINK-3082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15029089#comment-15029089 ] ASF GitHub Bot commented on FLINK-3082: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1411#issuecomment-159956326 Thanks for catching, this. Will merge this... > Confusing error about ManualTimestampSourceFunction > --- > > Key: FLINK-3082 > URL: https://issues.apache.org/jira/browse/FLINK-3082 > Project: Flink > Issue Type: Bug >Reporter: Niels Basjes >Assignee: Niels Basjes > > I wrote a source like this: > {code} > public class Foo extends RichSourceFunction { > {code} > and then did > {code} > ctx.collectWithTimestamp(event, event.eventTimestamp); > {code} > I got this error: > {code} > Caused by: java.lang.UnsupportedOperationException: Automatic-Timestamp > sources cannot emit elements with a timestamp. See interface > ManualTimestampSourceFunction if you want to manually assign timestamps to > elements. > at > org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collectWithTimestamp(StreamSource.java:97) > {code} > After some digging it turns out that {{ManualTimestampSourceFunction}} was > renamed to {{EventTimeSourceFunction}} and apparently the old name still > lingers in this error message. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3051) Define a maximum number of concurrent inflight checkpoints
[ https://issues.apache.org/jira/browse/FLINK-3051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15029136#comment-15029136 ] ASF GitHub Bot commented on FLINK-3051: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1408 > Define a maximum number of concurrent inflight checkpoints > -- > > Key: FLINK-3051 > URL: https://issues.apache.org/jira/browse/FLINK-3051 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.0.0 > > > The checkpoint coordinator should define an option to limit the maximum > number of current inflight checkpoints, as well as the checkpoint timeouts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3051] Control the maximum number of con...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1408 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3086) ExpressionParser does not support concatenation of suffix operations
Timo Walther created FLINK-3086: --- Summary: ExpressionParser does not support concatenation of suffix operations Key: FLINK-3086 URL: https://issues.apache.org/jira/browse/FLINK-3086 Project: Flink Issue Type: Bug Components: Table API Reporter: Timo Walther The ExpressionParser of the Table API does not support concatenation of suffix operations. e.g. {code}table.select("field.cast(STRING).substring(2)"){code} throws an exception. {code} org.apache.flink.api.table.ExpressionException: Could not parse expression: string matching regex `\z' expected but `.' found at org.apache.flink.api.table.parser.ExpressionParser$.parseExpressionList(ExpressionParser.scala:224) {code} However, the Scala implicit Table Expression API supports this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3046] Integrate the Either Java type wi...
Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1393#issuecomment-159983550 Will merge this tomorrow if no objections are raised. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3046) Integrate the Either Java type with the TypeExtractor
[ https://issues.apache.org/jira/browse/FLINK-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15029211#comment-15029211 ] ASF GitHub Bot commented on FLINK-3046: --- Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1393#issuecomment-159983550 Will merge this tomorrow if no objections are raised. > Integrate the Either Java type with the TypeExtractor > - > > Key: FLINK-3046 > URL: https://issues.apache.org/jira/browse/FLINK-3046 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Affects Versions: 1.0.0 >Reporter: Vasia Kalavri >Assignee: Timo Walther > > Integrate the Either Java type with the TypeExtractor, so that the APIs > recognize the type and choose the type info properly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)