[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14940918#comment-14940918
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/1046
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14940920#comment-14940920
]
Matthias J. Sax commented on FLINK-2525:
Fixed via 9fe285a77de5cd1a35ceb58f9295751fd3dd9e15
Thanks for your contribution!
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14900123#comment-14900123
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user ffbin commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-141851757
Hi @fhueske , i will update the PR to use ExecutionConfig. Thanks.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14876240#comment-14876240
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user fhueske commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-141550369
Hi @ffbin,
not sure if you followed the discussion on the mailing list, but we
discussed to use the ExecutionConfig instead of the JobConfig. The reason is
that ExecutionConfig is user-facing and JobConfig is used for system internal
configurations.
See the discussion
[here](https://mail-archives.apache.org/mod_mbox/flink-dev/201509.mbox/%3CCANC1h_tVL1NHpoYrB9LaGRUP=tsyn2cpd2zfjoop2bhuddp...@mail.gmail.com%3E).
It would be nice, if you could update the PR to use ExecutionConfig.
Thanks a lot, Fabian
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744911#comment-14744911
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user StephanEwen commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-140291773
As per discussion on the dev list, the `ExecuionConfig` has the
`GlobalJobParameters`, which are useful if one type of config is used across
all operators.
If each of the operators needs its own config, can you create an abstract
base class for the storm functions which takes a configuration as an argumen?
BTW: There is no plan to remove the `withParameters()` method in the batch
API. It is just not the encouraged mechanism any more...
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744978#comment-14744978
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-140301378
Storm only supports one global configuration that is shared over all
spout/bolts. So `GlobalJobParameter` will work just fine.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744355#comment-14744355
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r39452503
--- Diff:
flink-core/src/main/java/org/apache/flink/configuration/Configuration.java ---
@@ -418,6 +418,17 @@ public void setBytes(String key, byte[] bytes) {
}
}
+ /**
+* Returns the clone of confData.
+*
+* @return the clone of confData
+*/
+ public HashMap getConfDataClone() {
+ synchronized (this.confData) {
+ return new HashMap(this.confData);
--- End diff --
This will just create a shallow copy, not a deep copy. Is that good enough?
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744372#comment-14744372
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r39453328
--- Diff:
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
---
@@ -91,7 +91,16 @@ public InputSplitProvider getInputSplitProvider() {
public Configuration getTaskStubParameters() {
return new
TaskConfig(env.getTaskConfiguration()).getStubParameters();
}
-
+
+ /**
+* Returns the job configuration.
+*
+* @return The job configuration.
+*/
+ public Configuration getJobConfiguration() {
+ return new Configuration(env.getJobConfiguration());
--- End diff --
The `JobConfiguration` is a system-internal configuration. I am not sure
that it is a good idea to give access to it in the `StreamingRuntimeContext`.
This change will be visible for all DataStream programs.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744362#comment-14744362
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r39453004
--- Diff:
flink-core/src/main/java/org/apache/flink/configuration/Configuration.java ---
@@ -418,6 +418,17 @@ public void setBytes(String key, byte[] bytes) {
}
}
+ /**
+* Returns the clone of confData.
+*
+* @return the clone of confData
+*/
+ public HashMap getConfDataClone() {
+ synchronized (this.confData) {
+ return new HashMap(this.confData);
--- End diff --
Yes. That's fine.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744330#comment-14744330
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r39450937
--- Diff: docs/apis/storm_compatibility.md ---
@@ -169,6 +169,13 @@ The input type is `Tuple1` and
`Fields("sentence")` specify that `input.
See
[BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java)
and
[BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java)
for examples.
+## Configure for embedded Spouts/Bolts
+Embedded Spouts/Bolts can be configure with user defined parameters.
+User defined parameters is stored in a `Map`(as in Storm).
+And this Map is provided as a parameter in the calls `Spout.open(...)` and
`Bolt.prepare(...)`.
+Configuration can be used in storm topologies mode or flink mode.
--- End diff --
capitalize *Storm* and *Flink*
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744329#comment-14744329
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r39450885
--- Diff: docs/apis/storm_compatibility.md ---
@@ -169,6 +169,13 @@ The input type is `Tuple1` and
`Fields("sentence")` specify that `input.
See
[BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java)
and
[BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java)
for examples.
+## Configure for embedded Spouts/Bolts
+Embedded Spouts/Bolts can be configure with user defined parameters.
+User defined parameters is stored in a `Map`(as in Storm).
--- End diff --
... *are* stored ...
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744399#comment-14744399
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user fhueske commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-140222747
Actually, I think going through the `TaskConfig` as proposed by @mjsax is
the cleaner way. Going through the system-internal `JobConfiguration` and
exposing it to user programs is not a good choice, in my opinion.
The purpose of `TaskConfig` is exactly to give parameters to a task
(function, spout). Also the `StreamingRuntimeContext` would not need to be
adapted, because it already offers a method `getTaskStubParameters()`. Would
that work as well or are there major issues preventing you from using the
`TaskConfig`?
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744411#comment-14744411
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-140224226
It is not clear (at least to me) how to do this. The API does not offer an
(obvious) way to set a configuration... (or I just don't get it).
`StreamExecutionEnvironment` only offers `.getConfig()` and there is no
`.withParameters(...)` in Streaming API (which is "deprecated" according to the
discussion on the dev list).
IHMO, the best way would be the possibility to set a configuration in the
environment that is distributed to all operators. Should be extend Streaming
API for this?
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14742904#comment-14742904
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user ffbin commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-139959540
@StephanEwen @rmetzger Can you have a look at it if it can be merged? I am
also work on storm task hooks and it depend on this PR. Thank you very much!
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736615#comment-14736615
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user ffbin commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-138865484
@mjsax Thanks. I have finish the change about all comments.
@StephanEwen @rmetzger Can you have a look at it if it can be merged?
Thank you very much!
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14734400#comment-14734400
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user ffbin commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-138470052
@mjsax @StephanEwen @rmetzger I have finish the change about all comments
and update documentation. Can you have a look at it if it can be merged? Thank
you very much!
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14734657#comment-14734657
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38914538
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
---
@@ -70,7 +92,7 @@ public void testRawType() throws Exception {
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout,
- Sets.newHashSet(new String[] {
Utils.DEFAULT_STREAM_ID }));
+ Sets.newHashSet(new
String[]{Utils.DEFAULT_STREAM_ID}));
--- End diff --
No formatting changes please.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14734665#comment-14734665
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-138524518
I just had a few "cosmetic" comments. Otherwise it looks good to me to get
merged.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14734641#comment-14734641
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38913931
--- Diff: docs/apis/storm_compatibility.md ---
@@ -201,6 +201,26 @@ DataStream s2 =
splitStream.select("s2").transform(/* use Bolt f
See
[SpoutSplitExample.java](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java)
for a full example.
+## Configure for embedded Spouts/Bolts
--- End diff --
Move this whole section one up.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14734655#comment-14734655
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38914417
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
---
@@ -107,4 +112,39 @@ public static TopologyContext
convertToTopologyContext(final StreamingRuntimeCon
return new FlinkTopologyContext(new StormTopology(spoutSpecs,
bolts, null), taskToComponents, taskId);
}
+ /**
+* Get storm configuration from StreamingRuntimeContext.
+* @param ctx The RuntimeContext of operator.
+* @return The storm configuration map.
+* @throws Exception
+* If configuration contains classes from the user code,
it may lead to ClassNotFoundException..
--- End diff --
Typo. Two dots at the end.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14734623#comment-14734623
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38913193
--- Diff: docs/apis/storm_compatibility.md ---
@@ -201,6 +201,26 @@ DataStream s2 =
splitStream.select("s2").transform(/* use Bolt f
See
[SpoutSplitExample.java](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java)
for a full example.
+## Configure for embedded Spouts/Bolts
+Embedder spouts/bolts can be configure with user defined parameters. User
defined parameters is a config `Map` and be called by `Spout.open(...)` and
`Bolt.prepare()`
+as first parameter. Configuration can be used in storm topologies mode or
flink mode.
+
+ 1.Storm topologies mode example
+
+...
+ Map conf = new HashMap();
--- End diff --
Can you please use the same markup code as in the other code examples.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14734661#comment-14734661
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38914633
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
---
@@ -88,9 +88,23 @@ public static void main(final String[] args) throws
Exception {
private static class ExclamationMap implements MapFunction {
+ private String exclamation;
+
+ public ExclamationMap() {
--- End diff --
Use `exclamationNum` as constructor parameter and build `exclamation`
accordingly.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14734648#comment-14734648
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38914053
--- Diff: docs/apis/storm_compatibility.md ---
@@ -201,6 +201,26 @@ DataStream s2 =
splitStream.select("s2").transform(/* use Bolt f
See
[SpoutSplitExample.java](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java)
for a full example.
+## Configure for embedded Spouts/Bolts
+Embedder spouts/bolts can be configure with user defined parameters. User
defined parameters is a config `Map` and be called by `Spout.open(...)` and
`Bolt.prepare()`
+as first parameter. Configuration can be used in storm topologies mode or
flink mode.
+
+ 1.Storm topologies mode example
--- End diff --
I don't think we need example code for embedded mode. (See "multiple output
streams": If a whole topology is executed using FlinkTopologyBuilder etc.,
there is no special attention required – it works as in regular Storm. ). A
simple sentence in the paragraph should be sufficient.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733433#comment-14733433
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38844802
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
---
@@ -78,6 +86,11 @@ public static void main(final String[] args) throws
Exception {
exclaimed.print();
}
+ // set bolt and map exclamation marks num
+ Configuration conf = new Configuration();
+ conf.setInteger(new String("exclamationNum"), exclamationNum);
--- End diff --
Why are you creating Strings like this?
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733429#comment-14733429
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38844529
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
---
@@ -107,4 +112,40 @@ public static TopologyContext
convertToTopologyContext(final StreamingRuntimeCon
return new FlinkTopologyContext(new StormTopology(spoutSpecs,
bolts, null), taskToComponents, taskId);
}
+ /**
+* Get storm configuration from StreamingRuntimeContext.
+*
+* @param ctx
+*The RuntimeContext of operator.
+* @return The storm configuration map.
+* @throws Exception
+*/
+ public static Map getStormConfFromContext(final RuntimeContext ctx)
+ throws Exception {
+ Map stormConf = null;
+ if (ctx instanceof StreamingRuntimeContext) {
+ Configuration jobConfiguration =
((StreamingRuntimeContext) ctx).getJobConfiguration();
+
+ if (jobConfiguration != null) {
+ /* topologies mode */
+ stormConf = (Map)
InstantiationUtil.readObjectFromConfig(jobConfiguration,
StormConfig.STORM_DEFAULT_CONFIG, Map.class.getClassLoader());
--- End diff --
Since the map is untyped, it might happen that users pass arbitrary
objects, containing classes from the user code into the Map.
This would lead to class not found exceptions when running the code on
clusters. Can you use the classloader of `StormWrapperSetupHelper´ ?
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733670#comment-14733670
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user ffbin commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-138292133
@mjsax @StephanEwen @rmetzger I have finish the change about all comments.
Can you have a look at it if it can be merged? Thank you very much!
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733522#comment-14733522
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user ffbin commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-138267349
@rmetzger Thanks.I have use the classloader of `StormWrapperSetupHelper´
instead of 'map' and change the fashion of creating Strings.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733676#comment-14733676
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38860604
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
---
@@ -222,6 +239,31 @@ public void testOpenSink() throws Exception {
@SuppressWarnings("unchecked")
@Test
+ public void testOpenWithStormConf() throws Exception {
+ final IRichBolt bolt = mock(IRichBolt.class);
+ final StormBoltWrapper wrapper = new
StormBoltWrapper(bolt);
+
+ Configuration jobConfiguration = new Configuration();
+ jobConfiguration.setString(new String("path"), new
String("/home/user/file.txt"));
+ jobConfiguration.setInteger(new String("delimitSize"), 1024);
+ Environment env = new RuntimeEnvironment(new JobID(), new
JobVertexID(), new ExecutionAttemptID(),
+ new String(), new String(), 1, 2,
jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
+ mock(MemoryManager.class),
mock(IOManager.class), mock(BroadcastVariableManager.class),
+ mock(AccumulatorRegistry.class),
mock(InputSplitProvider.class), mock(Map.class),
+ new ResultPartitionWriter[1], new InputGate[1],
mock(ActorGateway.class),
+ mock(TaskManagerRuntimeInfo.class));
+ StreamingRuntimeContext ctx = new StreamingRuntimeContext(env,
new ExecutionConfig(),
+ mock(KeySelector.class),
+ mock(StateHandleProvider.class),
mock(Map.class));
+
+ wrapper.setup(mock(Output.class), ctx);
+ wrapper.open(mock(Configuration.class));
--- End diff --
Now I understand. This is the unused `TaskConfiguration`.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733796#comment-14733796
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38866736
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
---
@@ -107,4 +112,40 @@ public static TopologyContext
convertToTopologyContext(final StreamingRuntimeCon
return new FlinkTopologyContext(new StormTopology(spoutSpecs,
bolts, null), taskToComponents, taskId);
}
+ /**
+* Get storm configuration from StreamingRuntimeContext.
+*
+* @param ctx
+*The RuntimeContext of operator.
+* @return The storm configuration map.
+* @throws Exception
--- End diff --
JavaDoc incomplete
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733773#comment-14733773
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38865859
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
---
@@ -97,9 +102,20 @@ public AbstractStormSpoutWrapper(final IRichSpout spout,
}
@Override
+ public void open(Configuration parameters) throws Exception {
+ config = new HashMap();
+
+ /* parameters is task configuration, we can get storm
configuration only from job configuration */
+ Map stormConf =
StormWrapperSetupHelper.getStormConfFromContext(super.getRuntimeContext());
+ if (stormConf != null) {
+ config.putAll(stormConf);
+ }
+ }
--- End diff --
I just had a closer look at the code. Because we do not use the given
`Configuration`, we should move the code into `run` method and remove the
overwrite of `open()`.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733775#comment-14733775
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38865906
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
---
@@ -57,6 +58,10 @@
* Indicates, if the source is still running or was canceled.
*/
protected volatile boolean isRunning = true;
+ /**
+* The job configuration which include storm configuration.
+*/
+ protected Map config;
--- End diff --
Can be removed, if `open()` is not used.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733788#comment-14733788
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38866357
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
---
@@ -107,4 +112,40 @@ public static TopologyContext
convertToTopologyContext(final StreamingRuntimeCon
return new FlinkTopologyContext(new StormTopology(spoutSpecs,
bolts, null), taskToComponents, taskId);
}
+ /**
+* Get storm configuration from StreamingRuntimeContext.
+*
+* @param ctx
+*The RuntimeContext of operator.
+* @return The storm configuration map.
+* @throws Exception
+*/
+ public static Map getStormConfFromContext(final RuntimeContext ctx)
+ throws Exception {
+ Map stormConf = null;
--- End diff --
Please add a test for this method.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733809#comment-14733809
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38867651
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
---
@@ -85,12 +103,43 @@ public static void main(final String[] args) throws
Exception {
// USER FUNCTIONS
//
*
- private static class ExclamationMap implements MapFunction {
+ private static class ExclamationMap extends AbstractRichFunction
implements MapFunction {
private static final long serialVersionUID =
-684993133807698042L;
+ private String exclamation;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ RuntimeContext ctx = super.getRuntimeContext();
--- End diff --
Same here.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733813#comment-14733813
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38867892
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
---
@@ -45,9 +49,16 @@ public StormFileSpout(final String path) {
public void open(final Map conf, final TopologyContext context, final
SpoutOutputCollector collector) {
super.open(conf, context, collector);
try {
+ /* If config is given, it should override constructor
path */
+ if (conf != null && conf.containsKey("textpath")) {
+ this.path = (String)conf.get("textpath");
+ }
+
this.reader = new BufferedReader(new
FileReader(this.path));
} catch (final FileNotFoundException e) {
throw new RuntimeException(e);
+ } catch (final NullPointerException e) {
--- End diff --
Why do you catch `NullPointerException`? Can be removed. NPE is a runtime
exception anyway and you just re-throw.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733811#comment-14733811
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38867724
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java
---
@@ -31,10 +31,22 @@
public class ExclamationBolt implements IRichBolt {
OutputCollector _collector;
+ private String exclamation;
+
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector
collector) {
_collector = collector;
+ exclamation = "!!!";
+
+ if (conf != null && conf.containsKey("exclamationNum")) {
--- End diff --
We can safely assume, that `conf != null`. No checking needed.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733812#comment-14733812
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38867760
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
---
@@ -45,9 +49,16 @@ public StormFileSpout(final String path) {
public void open(final Map conf, final TopologyContext context, final
SpoutOutputCollector collector) {
super.open(conf, context, collector);
try {
+ /* If config is given, it should override constructor
path */
+ if (conf != null && conf.containsKey("textpath")) {
+ this.path = (String)conf.get("textpath");
--- End diff --
`conf != null` can be removed.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733805#comment-14733805
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38867413
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
---
@@ -86,11 +99,43 @@ public static void main(final String[] args) throws
Exception {
// USER FUNCTIONS
//
*
- private static class ExclamationMap implements MapFunction {
+ private static class ExclamationMap extends AbstractRichFunction
implements MapFunction {
+
+ private String exclamation;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ RuntimeContext ctx = super.getRuntimeContext();
--- End diff --
You should not use `JobConfiguration` for regular Flink functions.
Instantiate `exlamation` via constructor Argument `exlcamationNum`.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733821#comment-14733821
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-138320697
Can you update documentation, too? For README.md just delete the line that
claims configuration is not supported. WebPage documentation should contain a
short paragraph how embedded Spouts/Bolts can be configures (for whole
topologies it works as in Storm, ie, we don't need to cover this in the
documentation).
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14732295#comment-14732295
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user ffbin commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-138051794
@mjsax @StephanEwen Thanks. I have finish the changes at the core classes.
when user use env.getConfig().setGlobalJobParameters(conf); to set storm
configuration, I need convert Configuration into map, toMap()
is not enough, So i add getConfDataClone() to get a clone of inner config
hashmap.
I have a look at how to use TaskConfiguration instead of JobConfiguration.
TaskConfiguration transfer path is:
TaskConfiguration <-- TaskDeploymentDescriptor <-- JobVertex <--
StreamConfig
In StreamingJobGraphGenerator.java setVertexConfig() function set
TaskConfiguration, like this:
...
config.setNumberOfOutputs(nonChainableOutputs.size());
config.setNonChainedOutputs(nonChainableOutputs);
config.setChainedOutputs(chainableOutputs);
config.setCheckpointingEnabled(streamGraph.isCheckpointingEnabled());
...
If we want to use TaskConfiguration instead of JobConfiguration, we need
add special interface to set stormConfig in StreamGraph and StreamConfig.
I think it maybe break the separation between the storm-compatibility and
core code.
what is your opinion?
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14730868#comment-14730868
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user StephanEwen commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-137748262
Concerning the changes at the core classes:
- The storm config key is an application specific key and not part of the
system configuration, therefore it should be defined as part of the application
code.
- When returning the JobManager configuration, this should return an
unmodifiable configuration, so the internal config cannot be altered.
- Do you need access to the inner config hashmap? How about storing the
storm specific properties in a key like this:
`InstantiationUtil.writeObjectToConfig(stormProps, config, "STORM_CONF");`
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14730843#comment-14730843
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-137742282
I have just a quick look over it, and so far it like it. Two things are
open to be discussed. I not sure it the change to `ConfigConstants` in a good
choice. Would it be feasible to use `TaskConfiguration` instead of
`JobConfiguration`. This would make a code cleaner from my point of view.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14730869#comment-14730869
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user StephanEwen commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-137748448
The changes suggested above help with a cleaner separation between the
application (here storm compatibility) and the core code.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14725881#comment-14725881
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38456958
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
---
@@ -97,9 +106,42 @@ public AbstractStormSpoutWrapper(final IRichSpout spout,
}
@Override
+ public void open(Configuration parameters) throws Exception {
+ stormConf = new HashMap();
+
+ /* parameters is task configuration, we can get storm
configuration only from job configuration */
+ RuntimeContext ctx = super.getRuntimeContext();
+ if (ctx instanceof StreamingRuntimeContext) {
+ Configuration jobConfiguration =
((StreamingRuntimeContext) ctx).getJobConfiguration();
+
--- End diff --
I see. Would it be better to add Storm config to the task-config to
simplify code in `open(...)` method? For `FlinkTopology` the same config would
be added to each Spout/Bolt. At least for embedded mode, using task-config
instead of job-config would be more appropriate IMHO.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14724980#comment-14724980
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user ffbin commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-136636200
@mjsax @StephanEwen I have finish the code change.Can you give me some
comment? Thank you very much!
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14724717#comment-14724717
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user ffbin commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38382368
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
---
@@ -97,9 +106,42 @@ public AbstractStormSpoutWrapper(final IRichSpout spout,
}
@Override
+ public void open(Configuration parameters) throws Exception {
+ stormConf = new HashMap();
+
+ /* parameters is task configuration, we can get storm
configuration only from job configuration */
+ RuntimeContext ctx = super.getRuntimeContext();
+ if (ctx instanceof StreamingRuntimeContext) {
+ Configuration jobConfiguration =
((StreamingRuntimeContext) ctx).getJobConfiguration();
+
--- End diff --
The open() is usually called by openAllOperators(), and the Configuration
config parameter is usually task Configuration, not job Configuration.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723153#comment-14723153
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-136289777
This is the stack trace (occurs in 4/5 runs -- the other run failed before
due to unrelated test). It seems you broke something.
```
Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 5.149 sec
<<< FAILURE! - in
org.apache.flink.stormcompatibility.exclamation.ExclamationWithStormSpoutITCase
testJobWithoutObjectReuse(org.apache.flink.stormcompatibility.exclamation.ExclamationWithStormSpoutITCase)
Time elapsed: 3.825 sec <<< FAILURE!
java.lang.AssertionError: Error while calling the test program: Job
execution failed.
at org.junit.Assert.fail(Assert.java:88)
at
org.apache.flink.streaming.util.StreamingProgramTestBase.testJobWithoutObjectReuse(StreamingProgramTestBase.java:102)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:264)
at
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
at
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:124)
at
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:200)
at
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:153)
at
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
```
I will review after you fixed it.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723209#comment-14723209
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-136302889
Travis run on Linux. There is only a single ":" in the path there.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723212#comment-14723212
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user ffbin commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-136304296
Oh, So not all test case can run successfully in windows?
I have change this and commit again.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723219#comment-14723219
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-136305354
This might be the case. I never tried it. And as far as I know, all
developers work on Linux or Mac, so this was never an issue.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723206#comment-14723206
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user ffbin commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-136301903
@mjsax Thanks.
I have a question:
In my windows machine, the textPath of ExclamationWithStormSpoutITCase is
file:/C:/Users/xxx/AppData/Local/Temp/org.apache.flink.stormcompatibility.exclamation.ExclamationWithStormSpoutITCase-text.txt.
After split by ":", we get path
/Users/xxx/AppData/Local/Temp/org.apache.flink.stormcompatibility.exclamation.ExclamationWithStormSpoutITCase-text.txt
and StormFileSpout can not open this path.But CI can run successfully before.
Do you know why?
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723413#comment-14723413
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38312609
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
---
@@ -45,9 +49,16 @@ public StormFileSpout(final String path) {
public void open(final Map conf, final TopologyContext context, final
SpoutOutputCollector collector) {
super.open(conf, context, collector);
try {
+ /* if inputFile path has not set, get it from storm
configuration */
+ if (this.path == null && conf != null &&
conf.containsKey(new String("textpath"))) {
--- End diff --
I would not check for `this.path == null`. If config is given, it should
override constructor path.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723404#comment-14723404
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38312112
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
---
@@ -86,11 +99,40 @@ public static void main(final String[] args) throws
Exception {
// USER FUNCTIONS
//
*
- private static class ExclamationMap implements MapFunction {
+ private static class ExclamationMap extends AbstractRichFunction
implements MapFunction {
+
+ private String exclamation;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ exclamation = new String("!!!");
--- End diff --
move this to new `else` in line 121 to make clear it is the default value
in case no config is given.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723406#comment-14723406
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38312165
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
---
@@ -101,20 +143,23 @@ public String map(String value) throws Exception {
private static boolean fileOutput = false;
private static String textPath;
private static String outputPath;
+ private static int exclamationNum;
private static boolean parseParameters(final String[] args) {
if (args.length > 0) {
// parse input arguments
fileOutput = true;
- if (args.length == 2) {
+ if (args.length == 3) {
textPath = args[0];
outputPath = args[1];
+ exclamationNum = Integer.parseInt(args[2]);
} else {
System.err.println("Usage:
ExclamationWithStormBolt ");
return false;
}
} else {
+ exclamationNum = 3;
System.out.println("Executing ExclamationWithStormBolt
example with built-in default data");
System.out.println(" Provide parameters to read input
data from a file");
System.out.println(" Usage: ExclamationWithStormBolt
");
--- End diff --
param list
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723408#comment-14723408
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38312340
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
---
@@ -85,8 +103,36 @@ public static void main(final String[] args) throws
Exception {
// USER FUNCTIONS
//
*
- private static class ExclamationMap implements MapFunction {
+ private static class ExclamationMap extends AbstractRichFunction
implements MapFunction {
private static final long serialVersionUID =
-684993133807698042L;
+ private String exclamation;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ exclamation = new String("!!!");
--- End diff --
why no use input parameter?
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723407#comment-14723407
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38312308
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
---
@@ -101,20 +147,23 @@ public String map(String value) throws Exception {
private static boolean fileOutput = false;
private static String textPath;
private static String outputPath;
+ private static int exclamationNum;
private static boolean parseParameters(final String[] args) {
if (args.length > 0) {
// parse input arguments
fileOutput = true;
- if (args.length == 2) {
+ if (args.length == 3) {
textPath = args[0];
outputPath = args[1];
+ exclamationNum = Integer.parseInt(args[2]);
} else {
System.err.println("Usage:
ExclamationWithStormSpout ");
--- End diff --
param list
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723401#comment-14723401
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38311967
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
---
@@ -205,7 +211,35 @@ public void open(final Configuration parameters)
throws Exception {
this.numberOfAttributes,
flinkCollector));
}
- this.bolt.prepare(null, topologyContext, stormCollector);
+ Map stormConf = new HashMap();
--- End diff --
Seems to be the same code as in `AbstractStormSpoutWrapper.open(...)`. Can
you unify it, ie, add a new static method to `StormWrapperSetupHelper` that
does it, and just call it here.
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723414#comment-14723414
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38312802
--- Diff:
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -275,7 +275,14 @@
* Path to Hadoop configuration
*/
public static final String PATH_HADOOP_CONFIG = "fs.hdfs.hadoopconf";
-
+
+ // Storm Configuration
+
+ /**
+* storm configuration
+*/
+ public static final String STORM_DEFAULT_CONFIG = "storm.config";
--- End diff --
I think we should put this somewhere else... Maybe adding a new class in
`stormcompatibility.util` package. @StephanEwen what is your opinion?
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723400#comment-14723400
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38311822
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
---
@@ -97,9 +106,42 @@ public AbstractStormSpoutWrapper(final IRichSpout spout,
}
@Override
+ public void open(Configuration parameters) throws Exception {
+ stormConf = new HashMap();
+
+ /* parameters is task configuration, we can get storm
configuration only from job configuration */
+ RuntimeContext ctx = super.getRuntimeContext();
+ if (ctx instanceof StreamingRuntimeContext) {
+ Configuration jobConfiguration =
((StreamingRuntimeContext) ctx).getJobConfiguration();
+
--- End diff --
Why do you no use the function input parameter `open(Configuration
parameters)`? Is it different?
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723411#comment-14723411
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38312505
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
---
@@ -35,6 +35,10 @@
private String line;
private boolean newLineRead;
+ public FiniteStormFileSpout() {
+ super();
--- End diff --
no necessary. is called automatically. make a single line `public
FiniteStormFileSpout() {}`
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723405#comment-14723405
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r38312152
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
---
@@ -101,20 +143,23 @@ public String map(String value) throws Exception {
private static boolean fileOutput = false;
private static String textPath;
private static String outputPath;
+ private static int exclamationNum;
private static boolean parseParameters(final String[] args) {
if (args.length > 0) {
// parse input arguments
fileOutput = true;
- if (args.length == 2) {
+ if (args.length == 3) {
textPath = args[0];
outputPath = args[1];
+ exclamationNum = Integer.parseInt(args[2]);
} else {
System.err.println("Usage:
ExclamationWithStormBolt ");
--- End diff --
extend parameter list
> Add configuration support in Storm-compatibility
>
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14721918#comment-14721918
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user ffbin commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-136241503
@mjsax @StephanEwen I have finish the code changes.
1.serialize Storm Config as a byte[] into the Flink configuration
2.extend ExclamationTopology such that the number of added !in
ExclamationBolt and ExclamationWithStormSpout.ExclamationMap is configurable
and adapt the tests.
3.extend FiniteStormFileSpout and base class with an empty constructor and
configure the file to be opened via Storm configuration Map.
I have run flink-storm-compatibility test successfully in local machine and
do not know why CI failed.
Can you have a look at my code? Thank you very much.
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: Storm Compatibility
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14716575#comment-14716575
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-135406207
Sorry, but I don't understand your question...
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14716603#comment-14716603
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-135411850
Can you try to serialize the whole `Map` into a single `byte[]`?
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14716605#comment-14716605
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user ffbin commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-135412789
Oh. you are right. Serialize the whole Map into a single byte[] is
better.Thanks.
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14716525#comment-14716525
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-135399187
I think the `byte[]`converting approach is the correct way to go. Storm
config keys must not be `String`, thus the specific prefix trick cannot be
applied.
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14716551#comment-14716551
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user ffbin commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-135403187
@StephanEwen Thansk.The key of storm config is object, so maybe the
confData(HashMapString, Object) of Configuration is not enough.
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14716555#comment-14716555
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user StephanEwen commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-135403521
@ffbin: Can you try if you can simply put the serialized Storm Config as a
byte[] into the Flink configuration? You can the unpack it inside the storm
code, when needed.
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14716571#comment-14716571
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user ffbin commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-135405543
@mjsax hi. I want to make the number of added '!' in ExclamationBolt and
ExclamationWithStormSpout.ExclamationMap configurabled by prepare() / open()
function.The number can be get from jobConfiguration.What is your suggestion?
Thanks
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14714436#comment-14714436
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user StephanEwen commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-135085374
Putting a nested stormConf into the configuration seems just wrong,
sorry. Such a specific hack in a generic utility cannot yield maintainable code.
Why is that needed in the first place? Why not have a dedicated
configuration object for storm?
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14713053#comment-14713053
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r37976623
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
---
@@ -52,6 +77,38 @@ public void testRunExecuteCancelInfinite() throws
Exception {
}
@Test
+ public void testOpen() throws Exception {
+ final IRichSpout spout = mock(IRichSpout.class);
+ final StormSpoutWrapperTuple1Integer spoutWrapper = new
StormSpoutWrapperTuple1Integer(spout);
+
+ Map stormConf = new HashMap();
+ stormConf.put(new String(path), new
String(/home/user/file.txt));
+ stormConf.put(1, 1024);
+ Configuration jobConfiguration = new Configuration();
+ jobConfiguration.putStormConf(stormConf);
+ Environment env = new RuntimeEnvironment(new JobID(), new
JobVertexID(), new ExecutionAttemptID(),
+ new String(), new String(), 1, 2,
jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
+ mock(MemoryManager.class),
mock(IOManager.class), mock(BroadcastVariableManager.class),
+ mock(AccumulatorRegistry.class),
mock(InputSplitProvider.class), mock(Map.class),
+ new ResultPartitionWriter[1], new InputGate[1],
mock(ActorGateway.class),
+ mock(TaskManagerRuntimeInfo.class));
+ StreamingRuntimeContext runtimeContext = new
StreamingRuntimeContext(env, new ExecutionConfig(),
+ mock(KeySelector.class),
+ mock(StateHandleProvider.class),
mock(Map.class));
+
+ spoutWrapper.setRuntimeContext(runtimeContext);
+ spoutWrapper.open(mock(Configuration.class));
+ final SourceFunction.SourceContext ctx =
mock(SourceFunction.SourceContext.class);
+ spoutWrapper.cancel();
+ spoutWrapper.run(ctx);
+
+ Map mapExpect = new HashMap();
+ mapExpect.put(new String(path), new
String(/home/user/file.txt));
+ mapExpect.put(1, 1024);
+ verify(spout).open(eq(mapExpect), any(TopologyContext.class),
any(SpoutOutputCollector.class));
--- End diff --
Use `stormConf` instead of `mapExpect`
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14713050#comment-14713050
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r37976536
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
---
@@ -222,6 +240,36 @@ public void testOpenSink() throws Exception {
@SuppressWarnings(unchecked)
@Test
+ public void testOpenWithStormConf() throws Exception {
+ final IRichBolt bolt = mock(IRichBolt.class);
+ final StormBoltWrapperObject, Object wrapper = new
StormBoltWrapperObject, Object(bolt);
+
+ Map stormConf = new HashMap();
+ stormConf.put(new String(path), new
String(/home/user/file.txt));
+ stormConf.put(1, 1024);
+ Configuration jobConfiguration = new Configuration();
+ jobConfiguration.putStormConf(stormConf);
+ Environment env = new RuntimeEnvironment(new JobID(), new
JobVertexID(), new ExecutionAttemptID(),
+ new String(), new String(), 1, 2,
jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
+ mock(MemoryManager.class),
mock(IOManager.class), mock(BroadcastVariableManager.class),
+ mock(AccumulatorRegistry.class),
mock(InputSplitProvider.class), mock(Map.class),
+ new ResultPartitionWriter[1], new InputGate[1],
mock(ActorGateway.class),
+ mock(TaskManagerRuntimeInfo.class));
+ StreamingRuntimeContext ctx = new StreamingRuntimeContext(env,
new ExecutionConfig(),
+ mock(KeySelector.class),
+ mock(StateHandleProvider.class),
mock(Map.class));
+
+ wrapper.setup(mock(Output.class), ctx);
+ wrapper.open(mock(Configuration.class));
+
+ Map mapExpect = new HashMap();
+ mapExpect.put(new String(path), new
String(/home/user/file.txt));
+ mapExpect.put(1, 1024);
+ verify(bolt).prepare(eq(mapExpect), any(TopologyContext.class),
any(OutputCollector.class));
--- End diff --
Remove `mapExpected` and use `stormConf` from above to avoid code
duplication.
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14713352#comment-14713352
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-135006223
@ffbin Can you extend `ExclamationTopology` such that the number of added
`! `in `ExclamationBolt` and `ExclamationWithStormSpout.ExclamationMap` is
configurable and adapt the tests accordingly. Please add an additional user
parameter to `ExclamationWithStormSpout` and `ExclamationWithStormBolt`.
Furhtermore, please **extend** `FiniteStormFileSpout` or base class with an
empty constructor and configure the file to be opened via Storm configuration
Map for this case.
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14713070#comment-14713070
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-135000384
Can anyone have a look at `Configuration.java`. Not sure if the changes are
ok.
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709154#comment-14709154
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user ffbin commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r37744576
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
---
@@ -97,9 +103,21 @@ public AbstractStormSpoutWrapper(final IRichSpout spout,
}
@Override
+ public void open(Configuration parameters) throws Exception {
+ /* parameters is task configuration, we can get storm
configuration only from job configuration */
+ RuntimeContext ctx = super.getRuntimeContext();
+ if (ctx instanceof StreamingRuntimeContext)
+ {
+ Configuration jobConfiguration =
((StreamingRuntimeContext) ctx).getJobConfiguration();
+ stormConf = new HashMapString, Object();
+ stormConf.putAll(jobConfiguration.getConfData());
--- End diff --
I will add null test.In fact, jobConfiguration can not be null. Because
StromBoltWrapper unit test mock StreamingRuntimeContext object, so its
jobConfiguration can be null.
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709169#comment-14709169
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user ffbin commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r37745377
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
---
@@ -41,8 +43,49 @@ public void submitTopology(final String topologyName,
final Map?, ? conf, fina
public void submitTopologyWithOpts(final String topologyName, final
Map?, ? conf, final FlinkTopology topology,
final SubmitOptions submitOpts) throws Exception {
- ClusterUtil
-
.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName),
topology.getNumberOfTasks());
+ JobGraph jobGraph =
topology.getStreamGraph().getJobGraph(topologyName);
+ Configuration jobConfiguration = jobGraph.getJobConfiguration();
+
+ /* storm conf type must be MapString, Object */
+ MapString, Object stormConf = (MapString, Object)conf;
--- End diff --
The configuration in Storm is public class Config extends HashMapString,
Object.It extends HashMapString, Object,if i use untyped Map, maybe it is
hard to convert it into Storm Config.
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709194#comment-14709194
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r37746515
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
---
@@ -41,8 +43,49 @@ public void submitTopology(final String topologyName,
final Map?, ? conf, fina
public void submitTopologyWithOpts(final String topologyName, final
Map?, ? conf, final FlinkTopology topology,
final SubmitOptions submitOpts) throws Exception {
- ClusterUtil
-
.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName),
topology.getNumberOfTasks());
+ JobGraph jobGraph =
topology.getStreamGraph().getJobGraph(topologyName);
+ Configuration jobConfiguration = jobGraph.getJobConfiguration();
+
+ /* storm conf type must be MapString, Object */
+ MapString, Object stormConf = (MapString, Object)conf;
--- End diff --
The definition of `Config` in not important here. Storm uses raw map as
parameter in `StormSubmitter.submitTopology(...)`, `Spout.open(...)`, and
`Bolt.prepare(...)`. Thus, it would be possible to use a different type than
`String` for key values.
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709281#comment-14709281
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user ffbin commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r37750149
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
---
@@ -41,8 +43,49 @@ public void submitTopology(final String topologyName,
final Map?, ? conf, fina
public void submitTopologyWithOpts(final String topologyName, final
Map?, ? conf, final FlinkTopology topology,
final SubmitOptions submitOpts) throws Exception {
- ClusterUtil
-
.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName),
topology.getNumberOfTasks());
+ JobGraph jobGraph =
topology.getStreamGraph().getJobGraph(topologyName);
+ Configuration jobConfiguration = jobGraph.getJobConfiguration();
+
+ /* storm conf type must be MapString, Object */
+ MapString, Object stormConf = (MapString, Object)conf;
--- End diff --
Thanks.I will change this and maybe the Configuration should add a new map,
HashMapString, Object confData is not enough.
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709180#comment-14709180
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user ffbin commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r37745758
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
---
@@ -222,6 +239,31 @@ public void testOpenSink() throws Exception {
@SuppressWarnings(unchecked)
@Test
+ public void testOpenWithStormConf() throws Exception {
+ final IRichBolt bolt = mock(IRichBolt.class);
+ final StormBoltWrapperObject, Object wrapper = new
StormBoltWrapperObject, Object(bolt);
+
+ Configuration jobConfiguration = new Configuration();
+ jobConfiguration.setString(new String(path), new
String(/home/user/file.txt));
+ jobConfiguration.setInteger(new String(delimitSize), 1024);
+ Environment env = new RuntimeEnvironment(new JobID(), new
JobVertexID(), new ExecutionAttemptID(),
+ new String(), new String(), 1, 2,
jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
+ mock(MemoryManager.class),
mock(IOManager.class), mock(BroadcastVariableManager.class),
+ mock(AccumulatorRegistry.class),
mock(InputSplitProvider.class), mock(Map.class),
+ new ResultPartitionWriter[1], new InputGate[1],
mock(ActorGateway.class),
+ mock(TaskManagerRuntimeInfo.class));
+ StreamingRuntimeContext ctx = new StreamingRuntimeContext(env,
new ExecutionConfig(),
+ mock(KeySelector.class),
+ mock(StateHandleProvider.class),
mock(Map.class));
+
+ wrapper.setup(mock(Output.class), ctx);
+ wrapper.open(mock(Configuration.class));
+
+ verify(bolt).prepare(any(Map.class),
any(TopologyContext.class), any(OutputCollector.class));
--- End diff --
Thanks.I will check the map.
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709207#comment-14709207
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user ffbin commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-134184818
@mjsax Thank you very much.I miss the change in FlinkClient.I will fix it
and test via bin/start-local.sh.In china, now we can not see the CI details and
it is hard to know why CI failed.Thank you for your reminder.
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709226#comment-14709226
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-134190885
It fails in two test. You should actually see it, if you execute test
locally. You should run test each time before you open/update an PR (at least
for the module you did changes).
```
Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.751 sec
FAILURE! - in org.apache.flink.stormcompatibility.split.BoltSplitITCase
testTopology(org.apache.flink.stormcompatibility.split.BoltSplitITCase)
Time elapsed: 0.635 sec ERROR!
java.lang.NullPointerException: null
at
org.apache.flink.stormcompatibility.api.FlinkLocalCluster.submitTopologyWithOpts(FlinkLocalCluster.java:52)
at
org.apache.flink.stormcompatibility.api.FlinkLocalCluster.submitTopology(FlinkLocalCluster.java:41)
at
org.apache.flink.stormcompatibility.split.StormSplitStreamBoltLocal.main(StormSplitStreamBoltLocal.java:42)
at
org.apache.flink.stormcompatibility.split.BoltSplitITCase.testTopology(BoltSplitITCase.java:25)
```
and
```
Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.704 sec
FAILURE! - in org.apache.flink.stormcompatibility.split.SpoutSplitITCase
testTopology(org.apache.flink.stormcompatibility.split.SpoutSplitITCase)
Time elapsed: 0.584 sec ERROR!
java.lang.NullPointerException: null
at
org.apache.flink.stormcompatibility.api.FlinkLocalCluster.submitTopologyWithOpts(FlinkLocalCluster.java:52)
at
org.apache.flink.stormcompatibility.api.FlinkLocalCluster.submitTopology(FlinkLocalCluster.java:41)
at
org.apache.flink.stormcompatibility.split.StormSplitStreamSpoutLocal.main(StormSplitStreamSpoutLocal.java:42)
at
org.apache.flink.stormcompatibility.split.SpoutSplitITCase.testTopology(SpoutSplitITCase.java:25)
```
Just out of curiosity: why can you not see Travis details?
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709261#comment-14709261
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user ffbin commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-134198533
@mjsax .The reason why i can not see Travis details is that(from reply
mail):
The problem is that our CDN is currently blocked in mainland China. I'm
talking to our CDN provider right now for getting a custom SSL certificate and
domain set up, so we should be usable from China within the next weeks
hopefully.
I will fix the code.I only run the test of core, and miss the test in
example.It is my fault.Thanks!
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709176#comment-14709176
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r37745589
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
---
@@ -97,9 +103,21 @@ public AbstractStormSpoutWrapper(final IRichSpout spout,
}
@Override
+ public void open(Configuration parameters) throws Exception {
+ /* parameters is task configuration, we can get storm
configuration only from job configuration */
+ RuntimeContext ctx = super.getRuntimeContext();
+ if (ctx instanceof StreamingRuntimeContext)
+ {
+ Configuration jobConfiguration =
((StreamingRuntimeContext) ctx).getJobConfiguration();
+ stormConf = new HashMapString, Object();
+ stormConf.putAll(jobConfiguration.getConfData());
--- End diff --
If it cannot be `null` in test, does not mean it cannot be `null` in real
cluster deployment...
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709189#comment-14709189
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r37746137
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
---
@@ -41,8 +43,49 @@ public void submitTopology(final String topologyName,
final Map?, ? conf, fina
public void submitTopologyWithOpts(final String topologyName, final
Map?, ? conf, final FlinkTopology topology,
final SubmitOptions submitOpts) throws Exception {
- ClusterUtil
-
.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName),
topology.getNumberOfTasks());
+ JobGraph jobGraph =
topology.getStreamGraph().getJobGraph(topologyName);
+ Configuration jobConfiguration = jobGraph.getJobConfiguration();
+
+ /* storm conf type must be MapString, Object */
+ MapString, Object stormConf = (MapString, Object)conf;
--- End diff --
Your comment is not rendered correctly. Read Markdown supported. For
inline source code use single tick at the beginning and end. You meant `public
class Config extends HashMapString, Object` but `String,Object` is not
shown above
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709179#comment-14709179
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r37745701
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
---
@@ -41,8 +43,49 @@ public void submitTopology(final String topologyName,
final Map?, ? conf, fina
public void submitTopologyWithOpts(final String topologyName, final
Map?, ? conf, final FlinkTopology topology,
final SubmitOptions submitOpts) throws Exception {
- ClusterUtil
-
.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName),
topology.getNumberOfTasks());
+ JobGraph jobGraph =
topology.getStreamGraph().getJobGraph(topologyName);
+ Configuration jobConfiguration = jobGraph.getJobConfiguration();
+
+ /* storm conf type must be MapString, Object */
+ MapString, Object stormConf = (MapString, Object)conf;
--- End diff --
It is not about `HashMap` vs `Map`. It's about the generic types. You
should not use `MapString,Object` but raw type `Map`.
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14708995#comment-14708995
]
ASF GitHub Bot commented on FLINK-2525:
---
GitHub user ffbin opened a pull request:
https://github.com/apache/flink/pull/1046
[FLINK-2525]Add configuration support in Storm-compatibility
- enable config can used in Spouts.open() and Bout.prepare().
Example like this:
public static void main(final String[] args) {
String topologyId = Streaming WordCount;
final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
...
final Config conf = new Config();
conf.put(wordsFile, /home/user/);
conf.put(delimitSize, 1024);
final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology(topologyId, conf, builder.createTopology());
Utils.sleep(10 * 1000);
cluster.killTopology(topologyId);
cluster.shutdown();
}
public class WordReader implements IRichSpout {
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
try {
this.context = context;
this.fileReader = new FileReader(conf.get(wordsFile));
} catch (FileNotFoundException e) {
throw new RuntimeException(Error reading file
[+conf.get(wordFile)+]);
}
this.collector = collector;
}
}
public final class StormBoltTokenizer implements IRichBolt {
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.delimitSize = stormConf.get(delimitSize);
this.collector = collector;
}
}
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/ffbin/flink FLINK-2525
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/1046.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1046
commit c6aebc10b7a010cc9cd5fb5b6505fdbc942ab7b9
Author: ffbin 869218...@qq.com
Date: 2015-08-24T09:07:26Z
[FLINK-2525]Add configuration support in Storm-compatibility
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709106#comment-14709106
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r37741197
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
---
@@ -97,9 +103,21 @@ public AbstractStormSpoutWrapper(final IRichSpout spout,
}
@Override
+ public void open(Configuration parameters) throws Exception {
+ /* parameters is task configuration, we can get storm
configuration only from job configuration */
+ RuntimeContext ctx = super.getRuntimeContext();
+ if (ctx instanceof StreamingRuntimeContext)
+ {
+ Configuration jobConfiguration =
((StreamingRuntimeContext) ctx).getJobConfiguration();
+ stormConf = new HashMapString, Object();
+ stormConf.putAll(jobConfiguration.getConfData());
--- End diff --
Missing `null` test on `jobConfiguration` (or is this test not necessary --
it is done in StromBoltWrapper)
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709108#comment-14709108
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r37741352
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
---
@@ -222,6 +239,31 @@ public void testOpenSink() throws Exception {
@SuppressWarnings(unchecked)
@Test
+ public void testOpenWithStormConf() throws Exception {
+ final IRichBolt bolt = mock(IRichBolt.class);
+ final StormBoltWrapperObject, Object wrapper = new
StormBoltWrapperObject, Object(bolt);
+
+ Configuration jobConfiguration = new Configuration();
+ jobConfiguration.setString(new String(path), new
String(/home/user/file.txt));
+ jobConfiguration.setInteger(new String(delimitSize), 1024);
+ Environment env = new RuntimeEnvironment(new JobID(), new
JobVertexID(), new ExecutionAttemptID(),
+ new String(), new String(), 1, 2,
jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
+ mock(MemoryManager.class),
mock(IOManager.class), mock(BroadcastVariableManager.class),
+ mock(AccumulatorRegistry.class),
mock(InputSplitProvider.class), mock(Map.class),
+ new ResultPartitionWriter[1], new InputGate[1],
mock(ActorGateway.class),
+ mock(TaskManagerRuntimeInfo.class));
+ StreamingRuntimeContext ctx = new StreamingRuntimeContext(env,
new ExecutionConfig(),
+ mock(KeySelector.class),
+ mock(StateHandleProvider.class),
mock(Map.class));
+
+ wrapper.setup(mock(Output.class), ctx);
+ wrapper.open(mock(Configuration.class));
+
+ verify(bolt).prepare(any(Map.class),
any(TopologyContext.class), any(OutputCollector.class));
--- End diff --
You should not check for `any(Map.class)` but check if the map contains the
value you set above in `jobConfiguration`
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709107#comment-14709107
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r37741303
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
---
@@ -222,6 +239,31 @@ public void testOpenSink() throws Exception {
@SuppressWarnings(unchecked)
@Test
+ public void testOpenWithStormConf() throws Exception {
+ final IRichBolt bolt = mock(IRichBolt.class);
+ final StormBoltWrapperObject, Object wrapper = new
StormBoltWrapperObject, Object(bolt);
+
+ Configuration jobConfiguration = new Configuration();
+ jobConfiguration.setString(new String(path), new
String(/home/user/file.txt));
+ jobConfiguration.setInteger(new String(delimitSize), 1024);
+ Environment env = new RuntimeEnvironment(new JobID(), new
JobVertexID(), new ExecutionAttemptID(),
+ new String(), new String(), 1, 2,
jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
+ mock(MemoryManager.class),
mock(IOManager.class), mock(BroadcastVariableManager.class),
+ mock(AccumulatorRegistry.class),
mock(InputSplitProvider.class), mock(Map.class),
+ new ResultPartitionWriter[1], new InputGate[1],
mock(ActorGateway.class),
+ mock(TaskManagerRuntimeInfo.class));
+ StreamingRuntimeContext ctx = new StreamingRuntimeContext(env,
new ExecutionConfig(),
+ mock(KeySelector.class),
+ mock(StateHandleProvider.class),
mock(Map.class));
+
+ wrapper.setup(mock(Output.class), ctx);
+ wrapper.open(mock(Configuration.class));
--- End diff --
Why do you mock here and not use `jobConfiguration`?
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709109#comment-14709109
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r37741464
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
---
@@ -52,6 +75,32 @@ public void testRunExecuteCancelInfinite() throws
Exception {
}
@Test
+ public void testOpen() throws Exception {
+ final IRichSpout spout = mock(IRichSpout.class);
+ final StormSpoutWrapperTuple1Integer spoutWrapper = new
StormSpoutWrapperTuple1Integer(spout);
+
+ Configuration jobConfiguration = new Configuration();
+ jobConfiguration.setString(new String(path), new
String(/home/user/file.txt));
+ jobConfiguration.setInteger(new String(delimitSize), 1024);
+ Environment env = new RuntimeEnvironment(new JobID(), new
JobVertexID(), new ExecutionAttemptID(),
+ new String(), new String(), 1, 2,
jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
+ mock(MemoryManager.class),
mock(IOManager.class), mock(BroadcastVariableManager.class),
+ mock(AccumulatorRegistry.class),
mock(InputSplitProvider.class), mock(Map.class),
+ new ResultPartitionWriter[1], new InputGate[1],
mock(ActorGateway.class),
+ mock(TaskManagerRuntimeInfo.class));
+ StreamingRuntimeContext runtimeContext = new
StreamingRuntimeContext(env, new ExecutionConfig(),
+ mock(KeySelector.class),
+ mock(StateHandleProvider.class),
mock(Map.class));
+
+ spoutWrapper.setRuntimeContext(runtimeContext);
+ spoutWrapper.open(mock(Configuration.class));
+ final SourceFunction.SourceContext ctx =
mock(SourceFunction.SourceContext.class);
+ spoutWrapper.cancel();
+ spoutWrapper.run(ctx);
+ verify(spout).open(any(Map.class), any(TopologyContext.class),
any(SpoutOutputCollector.class));
+ }
+
--- End diff --
Some comment as in `StormBoltWrapperTest`
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709113#comment-14709113
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r37741748
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
---
@@ -41,8 +43,49 @@ public void submitTopology(final String topologyName,
final Map?, ? conf, fina
public void submitTopologyWithOpts(final String topologyName, final
Map?, ? conf, final FlinkTopology topology,
final SubmitOptions submitOpts) throws Exception {
- ClusterUtil
-
.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName),
topology.getNumberOfTasks());
+ JobGraph jobGraph =
topology.getStreamGraph().getJobGraph(topologyName);
+ Configuration jobConfiguration = jobGraph.getJobConfiguration();
+
+ /* storm conf type must be MapString, Object */
+ MapString, Object stormConf = (MapString, Object)conf;
--- End diff --
The configuration in Strom is an untyped `Map`. Casting it to `MapString,
Object` might fail. Flink should not limit the Map in this way.
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709125#comment-14709125
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-134162565
I don't see changes in `FlinkClient`. Only in `FlinkLocalCluster`. Did you
test by starting Flink via `bin/start-local.sh` (it would be even better to
test in a real cluster)?
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709131#comment-14709131
]
ASF GitHub Bot commented on FLINK-2525:
---
Github user mjsax commented on the pull request:
https://github.com/apache/flink/pull/1046#issuecomment-134162997
Travis fails because you broke something...
Add configuration support in Storm-compatibility
Key: FLINK-2525
URL: https://issues.apache.org/jira/browse/FLINK-2525
Project: Flink
Issue Type: New Feature
Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin
Spouts and Bolt are initialized by a call to `Spout.open(...)` and
`Bolt.prepare()`, respectively. Both methods have a config `Map` as first
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
be configure with user defined parameters. In order to support this feature,
spout and bolt wrapper classes need to be extended to create a proper `Map`
object. Furthermore, the clients need to be extended to take a `Map`,
translate it into a Flink `Configuration` that is forwarded to the wrappers
for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)