[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14940918#comment-14940918
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1046


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-10-02 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14940920#comment-14940920
 ] 

Matthias J. Sax commented on FLINK-2525:


Fixed via 9fe285a77de5cd1a35ceb58f9295751fd3dd9e15

Thanks for your contribution!

> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14900123#comment-14900123
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user ffbin commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-141851757
  
Hi @fhueske , i will update the PR to use ExecutionConfig. Thanks.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14876240#comment-14876240
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-141550369
  
Hi @ffbin,
not sure if you followed the discussion on the mailing list, but we 
discussed to use the ExecutionConfig instead of the JobConfig. The reason is 
that ExecutionConfig is user-facing and JobConfig is used for system internal 
configurations. 

See the discussion 
[here](https://mail-archives.apache.org/mod_mbox/flink-dev/201509.mbox/%3CCANC1h_tVL1NHpoYrB9LaGRUP=tsyn2cpd2zfjoop2bhuddp...@mail.gmail.com%3E).

It would be nice, if you could update the PR to use ExecutionConfig. 
Thanks a lot, Fabian


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744911#comment-14744911
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-140291773
  
As per discussion on the dev list, the `ExecuionConfig` has the 
`GlobalJobParameters`, which are useful if one type of config is used across 
all operators.

If each of the operators needs its own config, can you create an abstract 
base class for the storm functions which takes a configuration as an argumen?

BTW: There is no plan to remove the `withParameters()` method in the batch 
API. It is just not the encouraged mechanism any more...


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744978#comment-14744978
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-140301378
  
Storm only supports one global configuration that is shared over all 
spout/bolts. So `GlobalJobParameter` will work just fine.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744355#comment-14744355
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r39452503
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/Configuration.java ---
@@ -418,6 +418,17 @@ public void setBytes(String key, byte[] bytes) {
}
}
 
+   /**
+* Returns the clone of confData.
+*
+* @return the clone of confData
+*/
+   public HashMap getConfDataClone() {
+   synchronized (this.confData) {
+   return new HashMap(this.confData);
--- End diff --

This will just create a shallow copy, not a deep copy. Is that good enough?


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744372#comment-14744372
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r39453328
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
 ---
@@ -91,7 +91,16 @@ public InputSplitProvider getInputSplitProvider() {
public Configuration getTaskStubParameters() {
return new 
TaskConfig(env.getTaskConfiguration()).getStubParameters();
}
-   
+
+   /**
+* Returns the job configuration.
+*
+* @return The job configuration.
+*/
+   public Configuration getJobConfiguration() {
+   return new Configuration(env.getJobConfiguration());
--- End diff --

The `JobConfiguration` is a system-internal configuration. I am not sure 
that it is a good idea to give access to it in the `StreamingRuntimeContext`. 
This change will be visible for all DataStream programs.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744362#comment-14744362
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r39453004
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/Configuration.java ---
@@ -418,6 +418,17 @@ public void setBytes(String key, byte[] bytes) {
}
}
 
+   /**
+* Returns the clone of confData.
+*
+* @return the clone of confData
+*/
+   public HashMap getConfDataClone() {
+   synchronized (this.confData) {
+   return new HashMap(this.confData);
--- End diff --

Yes. That's fine.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744330#comment-14744330
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r39450937
  
--- Diff: docs/apis/storm_compatibility.md ---
@@ -169,6 +169,13 @@ The input type is `Tuple1` and 
`Fields("sentence")` specify that `input.
 
 See 
[BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java)
 and 
[BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java)
 for examples.  
 
+## Configure for embedded Spouts/Bolts
+Embedded Spouts/Bolts can be configure with user defined parameters.
+User defined parameters is stored in a `Map`(as in Storm).
+And this Map is provided as a parameter in the calls `Spout.open(...)` and 
`Bolt.prepare(...)`.
+Configuration can be used in storm topologies mode or flink mode.
--- End diff --

capitalize *Storm* and *Flink*


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744329#comment-14744329
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r39450885
  
--- Diff: docs/apis/storm_compatibility.md ---
@@ -169,6 +169,13 @@ The input type is `Tuple1` and 
`Fields("sentence")` specify that `input.
 
 See 
[BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java)
 and 
[BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java)
 for examples.  
 
+## Configure for embedded Spouts/Bolts
+Embedded Spouts/Bolts can be configure with user defined parameters.
+User defined parameters is stored in a `Map`(as in Storm).
--- End diff --

... *are* stored ...


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744399#comment-14744399
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-140222747
  
Actually, I think going through the `TaskConfig` as proposed by @mjsax is 
the cleaner way. Going through the system-internal `JobConfiguration` and 
exposing it to user programs is not a good choice, in my opinion. 

The purpose of `TaskConfig` is exactly to give parameters to a task 
(function, spout). Also the `StreamingRuntimeContext` would not need to be 
adapted, because it already offers a method `getTaskStubParameters()`. Would 
that work as well or are there major issues preventing you from using the 
`TaskConfig`?


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744411#comment-14744411
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-140224226
  
It is not clear (at least to me) how to do this. The API does not offer an 
(obvious) way to set a configuration... (or I just don't get it). 
`StreamExecutionEnvironment` only offers `.getConfig()` and there is no 
`.withParameters(...)` in Streaming API (which is "deprecated" according to the 
discussion on the dev list).

IHMO, the best way would be the possibility to set a configuration in the 
environment that is distributed to all operators. Should be extend Streaming 
API for this?


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14742904#comment-14742904
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user ffbin commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-139959540
  
@StephanEwen @rmetzger Can you have a look at it if it can be merged? I am 
also work on storm task hooks and it depend on this PR. Thank you very much!


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736615#comment-14736615
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user ffbin commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-138865484
  
@mjsax Thanks. I have finish the change about all comments.
@StephanEwen @rmetzger  Can you have a look at it if it can be merged? 
Thank you very much!


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14734400#comment-14734400
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user ffbin commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-138470052
  
@mjsax @StephanEwen @rmetzger I have finish the change about all comments 
and update documentation. Can you have a look at it if it can be merged? Thank 
you very much!


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14734657#comment-14734657
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38914538
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
 ---
@@ -70,7 +92,7 @@ public void testRawType() throws Exception {

PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout,
-   Sets.newHashSet(new String[] { 
Utils.DEFAULT_STREAM_ID }));
+   Sets.newHashSet(new 
String[]{Utils.DEFAULT_STREAM_ID}));
--- End diff --

No formatting changes please.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14734665#comment-14734665
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-138524518
  
I just had a few "cosmetic" comments. Otherwise it looks good to me to get 
merged.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14734641#comment-14734641
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38913931
  
--- Diff: docs/apis/storm_compatibility.md ---
@@ -201,6 +201,26 @@ DataStream s2 = 
splitStream.select("s2").transform(/* use Bolt f
 
 See 
[SpoutSplitExample.java](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java)
 for a full example.
 
+## Configure for embedded Spouts/Bolts
--- End diff --

Move this whole section one up.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14734655#comment-14734655
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38914417
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
 ---
@@ -107,4 +112,39 @@ public static TopologyContext 
convertToTopologyContext(final StreamingRuntimeCon
return new FlinkTopologyContext(new StormTopology(spoutSpecs, 
bolts, null), taskToComponents, taskId);
}
 
+   /**
+* Get storm configuration from StreamingRuntimeContext.
+* @param ctx The RuntimeContext of operator.
+* @return The storm configuration map.
+* @throws Exception
+*  If configuration contains classes from the user code, 
it may lead to ClassNotFoundException..
--- End diff --

Typo. Two dots at the end.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14734623#comment-14734623
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38913193
  
--- Diff: docs/apis/storm_compatibility.md ---
@@ -201,6 +201,26 @@ DataStream s2 = 
splitStream.select("s2").transform(/* use Bolt f
 
 See 
[SpoutSplitExample.java](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java)
 for a full example.
 
+## Configure for embedded Spouts/Bolts
+Embedder spouts/bolts can be configure with user defined parameters. User 
defined parameters is a config `Map` and be called by `Spout.open(...)` and 
`Bolt.prepare()`
+as first parameter. Configuration can be used in storm topologies mode or 
flink mode.
+
+ 1.Storm topologies mode example
+
+...
+   Map conf = new HashMap();
--- End diff --

Can you please use the same markup code as in the other code examples.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14734661#comment-14734661
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38914633
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
 ---
@@ -88,9 +88,23 @@ public static void main(final String[] args) throws 
Exception {
 
private static class ExclamationMap implements MapFunction {
 
+   private String exclamation;
+
+   public ExclamationMap() {
--- End diff --

Use `exclamationNum` as constructor parameter and build `exclamation` 
accordingly.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14734648#comment-14734648
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38914053
  
--- Diff: docs/apis/storm_compatibility.md ---
@@ -201,6 +201,26 @@ DataStream s2 = 
splitStream.select("s2").transform(/* use Bolt f
 
 See 
[SpoutSplitExample.java](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java)
 for a full example.
 
+## Configure for embedded Spouts/Bolts
+Embedder spouts/bolts can be configure with user defined parameters. User 
defined parameters is a config `Map` and be called by `Spout.open(...)` and 
`Bolt.prepare()`
+as first parameter. Configuration can be used in storm topologies mode or 
flink mode.
+
+ 1.Storm topologies mode example
--- End diff --

I don't think we need example code for embedded mode. (See "multiple output 
streams":  If a whole topology is executed using FlinkTopologyBuilder etc., 
there is no special attention required – it works as in regular Storm. ). A 
simple sentence in the paragraph should be sufficient.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733433#comment-14733433
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38844802
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
 ---
@@ -78,6 +86,11 @@ public static void main(final String[] args) throws 
Exception {
exclaimed.print();
}
 
+   // set bolt and map exclamation marks num
+   Configuration conf = new Configuration();
+   conf.setInteger(new String("exclamationNum"), exclamationNum);
--- End diff --

Why are you creating Strings like this?


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733432#comment-14733432
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38844654
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
 ---
@@ -222,6 +242,64 @@ public void testOpenSink() throws Exception {
 
@SuppressWarnings("unchecked")
@Test
+   public void testOpenWithStormConf() throws Exception {
+   final IRichBolt bolt = mock(IRichBolt.class);
+   final StormBoltWrapper wrapper = new 
StormBoltWrapper(bolt);
+
+   Map stormConf = new HashMap();
+   stormConf.put(new String("path"), new 
String("/home/user/file.txt"));
--- End diff --

Just use `"path`" here.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733429#comment-14733429
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38844529
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
 ---
@@ -107,4 +112,40 @@ public static TopologyContext 
convertToTopologyContext(final StreamingRuntimeCon
return new FlinkTopologyContext(new StormTopology(spoutSpecs, 
bolts, null), taskToComponents, taskId);
}
 
+   /**
+* Get storm configuration from StreamingRuntimeContext.
+*
+* @param ctx
+*The RuntimeContext of operator.
+* @return The storm configuration map.
+* @throws Exception
+*/
+   public static Map getStormConfFromContext(final RuntimeContext ctx)
+   throws Exception {
+   Map stormConf = null;
+   if (ctx instanceof StreamingRuntimeContext) {
+   Configuration jobConfiguration = 
((StreamingRuntimeContext) ctx).getJobConfiguration();
+
+   if (jobConfiguration != null) {
+   /* topologies mode */
+   stormConf = (Map) 
InstantiationUtil.readObjectFromConfig(jobConfiguration, 
StormConfig.STORM_DEFAULT_CONFIG, Map.class.getClassLoader());
--- End diff --

Since the map is untyped, it might happen that users pass arbitrary 
objects, containing classes from the user code into the Map.
This would lead to class not found exceptions when running the code on 
clusters. Can you use the classloader of `StormWrapperSetupHelper´ ?


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733670#comment-14733670
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user ffbin commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-138292133
  
@mjsax @StephanEwen @rmetzger I have finish the change about all comments. 
Can you have a look at it if it can be merged? Thank you very much!


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733522#comment-14733522
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user ffbin commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-138267349
  
@rmetzger Thanks.I have use the classloader of `StormWrapperSetupHelper´ 
instead of 'map' and change the fashion of creating Strings. 


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733676#comment-14733676
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38860604
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
 ---
@@ -222,6 +239,31 @@ public void testOpenSink() throws Exception {
 
@SuppressWarnings("unchecked")
@Test
+   public void testOpenWithStormConf() throws Exception {
+   final IRichBolt bolt = mock(IRichBolt.class);
+   final StormBoltWrapper wrapper = new 
StormBoltWrapper(bolt);
+
+   Configuration jobConfiguration = new Configuration();
+   jobConfiguration.setString(new String("path"), new 
String("/home/user/file.txt"));
+   jobConfiguration.setInteger(new String("delimitSize"), 1024);
+   Environment env = new RuntimeEnvironment(new JobID(), new 
JobVertexID(), new ExecutionAttemptID(),
+   new String(), new String(), 1, 2, 
jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
+   mock(MemoryManager.class), 
mock(IOManager.class), mock(BroadcastVariableManager.class),
+   mock(AccumulatorRegistry.class), 
mock(InputSplitProvider.class), mock(Map.class),
+   new ResultPartitionWriter[1], new InputGate[1], 
mock(ActorGateway.class),
+   mock(TaskManagerRuntimeInfo.class));
+   StreamingRuntimeContext ctx = new StreamingRuntimeContext(env, 
new ExecutionConfig(),
+   mock(KeySelector.class),
+   mock(StateHandleProvider.class), 
mock(Map.class));
+
+   wrapper.setup(mock(Output.class), ctx);
+   wrapper.open(mock(Configuration.class));
--- End diff --

Now I understand. This is the unused `TaskConfiguration`.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733796#comment-14733796
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38866736
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
 ---
@@ -107,4 +112,40 @@ public static TopologyContext 
convertToTopologyContext(final StreamingRuntimeCon
return new FlinkTopologyContext(new StormTopology(spoutSpecs, 
bolts, null), taskToComponents, taskId);
}
 
+   /**
+* Get storm configuration from StreamingRuntimeContext.
+*
+* @param ctx
+*The RuntimeContext of operator.
+* @return The storm configuration map.
+* @throws Exception
--- End diff --

JavaDoc incomplete


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733773#comment-14733773
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38865859
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
 ---
@@ -97,9 +102,20 @@ public AbstractStormSpoutWrapper(final IRichSpout spout,
}
 
@Override
+   public void open(Configuration parameters) throws Exception {
+   config = new HashMap();
+
+   /* parameters is task configuration, we can get storm 
configuration only from job configuration */
+   Map stormConf = 
StormWrapperSetupHelper.getStormConfFromContext(super.getRuntimeContext());
+   if (stormConf != null) {
+   config.putAll(stormConf);
+   }
+   }
--- End diff --

I just had a closer look at the code. Because we do not use the given 
`Configuration`, we should move the code into `run` method and remove the 
overwrite of `open()`.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733775#comment-14733775
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38865906
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
 ---
@@ -57,6 +58,10 @@
 * Indicates, if the source is still running or was canceled.
 */
protected volatile boolean isRunning = true;
+   /**
+* The job configuration which include storm configuration.
+*/
+   protected Map config;
--- End diff --

Can be removed, if `open()` is not used.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733788#comment-14733788
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38866357
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
 ---
@@ -107,4 +112,40 @@ public static TopologyContext 
convertToTopologyContext(final StreamingRuntimeCon
return new FlinkTopologyContext(new StormTopology(spoutSpecs, 
bolts, null), taskToComponents, taskId);
}
 
+   /**
+* Get storm configuration from StreamingRuntimeContext.
+*
+* @param ctx
+*The RuntimeContext of operator.
+* @return The storm configuration map.
+* @throws Exception
+*/
+   public static Map getStormConfFromContext(final RuntimeContext ctx)
+   throws Exception {
+   Map stormConf = null;
--- End diff --

Please add a test for this method.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733809#comment-14733809
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38867651
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
 ---
@@ -85,12 +103,43 @@ public static void main(final String[] args) throws 
Exception {
// USER FUNCTIONS
// 
*
 
-   private static class ExclamationMap implements MapFunction {
+   private static class ExclamationMap extends AbstractRichFunction 
implements MapFunction {
private static final long serialVersionUID = 
-684993133807698042L;
+   private String exclamation;
+
+   @Override
+   public void open(Configuration parameters) throws Exception {
+   RuntimeContext ctx = super.getRuntimeContext();
--- End diff --

Same here.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733813#comment-14733813
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38867892
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
 ---
@@ -45,9 +49,16 @@ public StormFileSpout(final String path) {
public void open(final Map conf, final TopologyContext context, final 
SpoutOutputCollector collector) {
super.open(conf, context, collector);
try {
+   /*  If config is given, it should override constructor 
path  */
+   if (conf != null && conf.containsKey("textpath")) {
+   this.path = (String)conf.get("textpath");
+   }
+
this.reader = new BufferedReader(new 
FileReader(this.path));
} catch (final FileNotFoundException e) {
throw new RuntimeException(e);
+   } catch (final NullPointerException e) {
--- End diff --

Why do you catch `NullPointerException`? Can be removed. NPE is a runtime 
exception anyway and you just re-throw.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733811#comment-14733811
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38867724
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java
 ---
@@ -31,10 +31,22 @@
 public class ExclamationBolt implements IRichBolt {
OutputCollector _collector;
 
+   private String exclamation;
+
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector 
collector) {
_collector = collector;
+   exclamation = "!!!";
+
+   if (conf != null && conf.containsKey("exclamationNum")) {
--- End diff --

We can safely assume, that `conf != null`. No checking needed.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733812#comment-14733812
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38867760
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
 ---
@@ -45,9 +49,16 @@ public StormFileSpout(final String path) {
public void open(final Map conf, final TopologyContext context, final 
SpoutOutputCollector collector) {
super.open(conf, context, collector);
try {
+   /*  If config is given, it should override constructor 
path  */
+   if (conf != null && conf.containsKey("textpath")) {
+   this.path = (String)conf.get("textpath");
--- End diff --

`conf != null` can be removed.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733805#comment-14733805
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38867413
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
 ---
@@ -86,11 +99,43 @@ public static void main(final String[] args) throws 
Exception {
// USER FUNCTIONS
// 
*
 
-   private static class ExclamationMap implements MapFunction {
+   private static class ExclamationMap extends AbstractRichFunction 
implements MapFunction {
+
+   private String exclamation;
+
+   @Override
+   public void open(Configuration parameters) throws Exception {
+   RuntimeContext ctx = super.getRuntimeContext();
--- End diff --

You should not use `JobConfiguration` for regular Flink functions. 
Instantiate `exlamation` via constructor Argument `exlcamationNum`.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733821#comment-14733821
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-138320697
  
Can you update documentation, too? For README.md just delete the line that 
claims configuration is not supported. WebPage documentation should contain a 
short paragraph how embedded Spouts/Bolts can be configures (for whole 
topologies it works as in Storm, ie, we don't need to cover this in the 
documentation).


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14732295#comment-14732295
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user ffbin commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-138051794
  
@mjsax @StephanEwen Thanks. I have finish the changes at the core classes. 
when user use env.getConfig().setGlobalJobParameters(conf); to set storm 
configuration,  I need convert Configuration into map, toMap() 
is not enough, So i add getConfDataClone() to get a clone of inner config 
hashmap.
I have a look at how to use TaskConfiguration instead of JobConfiguration.
TaskConfiguration transfer path is:
TaskConfiguration <-- TaskDeploymentDescriptor <-- JobVertex <-- 
StreamConfig 
In StreamingJobGraphGenerator.java setVertexConfig() function set 
TaskConfiguration, like this:
...
config.setNumberOfOutputs(nonChainableOutputs.size());
config.setNonChainedOutputs(nonChainableOutputs);
config.setChainedOutputs(chainableOutputs);

config.setCheckpointingEnabled(streamGraph.isCheckpointingEnabled());
...
If we want to use TaskConfiguration instead of JobConfiguration, we need 
add special interface to set stormConfig in StreamGraph and StreamConfig.
I think it maybe break the separation between the storm-compatibility and 
core code.
what is your opinion?


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14730868#comment-14730868
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-137748262
  
Concerning the changes at the core classes:
  - The storm config key is an application specific key and not part of the 
system configuration, therefore it should be defined as part of the application 
code.
  - When returning the JobManager configuration, this should return an 
unmodifiable configuration, so the internal config cannot be altered.
  - Do you need access to the inner config hashmap? How about storing the 
storm specific properties in a key like this: 
`InstantiationUtil.writeObjectToConfig(stormProps, config, "STORM_CONF");`


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14730843#comment-14730843
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-137742282
  
I have just a quick look over it, and so far it like it. Two things are 
open to be discussed. I not sure it the change to `ConfigConstants` in a good 
choice. Would it be feasible to use `TaskConfiguration` instead of 
`JobConfiguration`. This would make a code cleaner from my point of view.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14730869#comment-14730869
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-137748448
  
The changes suggested above help with a cleaner separation between the 
application (here storm compatibility) and the core code.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14725881#comment-14725881
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38456958
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
 ---
@@ -97,9 +106,42 @@ public AbstractStormSpoutWrapper(final IRichSpout spout,
}
 
@Override
+   public void open(Configuration parameters) throws Exception {
+   stormConf = new HashMap();
+
+   /* parameters is task configuration, we can get storm 
configuration only from job configuration */
+   RuntimeContext ctx = super.getRuntimeContext();
+   if (ctx instanceof StreamingRuntimeContext) {
+   Configuration jobConfiguration = 
((StreamingRuntimeContext) ctx).getJobConfiguration();
+
--- End diff --

I see. Would it be better to add Storm config to the task-config to 
simplify code in `open(...)`  method? For `FlinkTopology` the same config would 
be added to each Spout/Bolt. At least for embedded mode, using task-config 
instead of job-config would be more appropriate IMHO.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14724980#comment-14724980
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user ffbin commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-136636200
  
@mjsax @StephanEwen I have finish the code change.Can you give me some 
comment? Thank you very much!


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14724717#comment-14724717
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user ffbin commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38382368
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
 ---
@@ -97,9 +106,42 @@ public AbstractStormSpoutWrapper(final IRichSpout spout,
}
 
@Override
+   public void open(Configuration parameters) throws Exception {
+   stormConf = new HashMap();
+
+   /* parameters is task configuration, we can get storm 
configuration only from job configuration */
+   RuntimeContext ctx = super.getRuntimeContext();
+   if (ctx instanceof StreamingRuntimeContext) {
+   Configuration jobConfiguration = 
((StreamingRuntimeContext) ctx).getJobConfiguration();
+
--- End diff --

The open() is usually called by openAllOperators(), and the Configuration 
config parameter is usually task Configuration, not job Configuration. 


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723153#comment-14723153
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-136289777
  
This is the stack trace (occurs in 4/5 runs -- the other run failed before 
due to unrelated test). It seems you broke something.
```
Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 5.149 sec 
<<< FAILURE! - in 
org.apache.flink.stormcompatibility.exclamation.ExclamationWithStormSpoutITCase

testJobWithoutObjectReuse(org.apache.flink.stormcompatibility.exclamation.ExclamationWithStormSpoutITCase)
  Time elapsed: 3.825 sec  <<< FAILURE!
java.lang.AssertionError: Error while calling the test program: Job 
execution failed.
at org.junit.Assert.fail(Assert.java:88)
at 
org.apache.flink.streaming.util.StreamingProgramTestBase.testJobWithoutObjectReuse(StreamingProgramTestBase.java:102)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:264)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:124)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:200)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:153)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
```
I will review after you fixed it.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723209#comment-14723209
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-136302889
  
Travis run on Linux. There is only a single ":" in the path there.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723212#comment-14723212
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user ffbin commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-136304296
  
Oh, So not all test case can run successfully in windows? 
I have change this and commit again.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723219#comment-14723219
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-136305354
  
This might be the case. I never tried it. And as far as I know, all 
developers work on Linux or Mac, so this was never an issue.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723206#comment-14723206
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user ffbin commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-136301903
  
@mjsax Thanks.
I have a question:
In my windows machine, the textPath of ExclamationWithStormSpoutITCase is 
file:/C:/Users/xxx/AppData/Local/Temp/org.apache.flink.stormcompatibility.exclamation.ExclamationWithStormSpoutITCase-text.txt.
 After split by ":", we get path 
/Users/xxx/AppData/Local/Temp/org.apache.flink.stormcompatibility.exclamation.ExclamationWithStormSpoutITCase-text.txt
 and StormFileSpout can not open this path.But CI can run successfully before. 
Do you know why?




> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723413#comment-14723413
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38312609
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
 ---
@@ -45,9 +49,16 @@ public StormFileSpout(final String path) {
public void open(final Map conf, final TopologyContext context, final 
SpoutOutputCollector collector) {
super.open(conf, context, collector);
try {
+   /* if inputFile path has not set, get it from storm 
configuration  */
+   if (this.path == null && conf != null && 
conf.containsKey(new String("textpath"))) {
--- End diff --

I would not check for `this.path == null`. If config is given, it should 
override constructor path.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723404#comment-14723404
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38312112
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
 ---
@@ -86,11 +99,40 @@ public static void main(final String[] args) throws 
Exception {
// USER FUNCTIONS
// 
*
 
-   private static class ExclamationMap implements MapFunction {
+   private static class ExclamationMap extends AbstractRichFunction 
implements MapFunction {
+
+   private String exclamation;
+
+   @Override
+   public void open(Configuration parameters) throws Exception {
+   exclamation = new String("!!!");
--- End diff --

move this to new `else` in line 121 to make clear it is the default value 
in case no config is given.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723406#comment-14723406
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38312165
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
 ---
@@ -101,20 +143,23 @@ public String map(String value) throws Exception {
private static boolean fileOutput = false;
private static String textPath;
private static String outputPath;
+   private static int exclamationNum;
 
private static boolean parseParameters(final String[] args) {
 
if (args.length > 0) {
// parse input arguments
fileOutput = true;
-   if (args.length == 2) {
+   if (args.length == 3) {
textPath = args[0];
outputPath = args[1];
+   exclamationNum = Integer.parseInt(args[2]);
} else {
System.err.println("Usage: 
ExclamationWithStormBolt  ");
return false;
}
} else {
+   exclamationNum = 3;
System.out.println("Executing ExclamationWithStormBolt 
example with built-in default data");
System.out.println("  Provide parameters to read input 
data from a file");
System.out.println("  Usage: ExclamationWithStormBolt 
 ");
--- End diff --

param list


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723408#comment-14723408
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38312340
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
 ---
@@ -85,8 +103,36 @@ public static void main(final String[] args) throws 
Exception {
// USER FUNCTIONS
// 
*
 
-   private static class ExclamationMap implements MapFunction {
+   private static class ExclamationMap extends AbstractRichFunction 
implements MapFunction {
private static final long serialVersionUID = 
-684993133807698042L;
+   private String exclamation;
+
+   @Override
+   public void open(Configuration parameters) throws Exception {
+   exclamation = new String("!!!");
--- End diff --

why no use input parameter?


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723407#comment-14723407
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38312308
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
 ---
@@ -101,20 +147,23 @@ public String map(String value) throws Exception {
private static boolean fileOutput = false;
private static String textPath;
private static String outputPath;
+   private static int exclamationNum;
 
private static boolean parseParameters(final String[] args) {
 
if (args.length > 0) {
// parse input arguments
fileOutput = true;
-   if (args.length == 2) {
+   if (args.length == 3) {
textPath = args[0];
outputPath = args[1];
+   exclamationNum = Integer.parseInt(args[2]);
} else {
System.err.println("Usage: 
ExclamationWithStormSpout  ");
--- End diff --

param list


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723401#comment-14723401
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38311967
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
 ---
@@ -205,7 +211,35 @@ public void open(final Configuration parameters) 
throws Exception {
this.numberOfAttributes, 
flinkCollector));
}
 
-   this.bolt.prepare(null, topologyContext, stormCollector);
+   Map stormConf = new HashMap();
--- End diff --

Seems to be the same code as in `AbstractStormSpoutWrapper.open(...)`. Can 
you unify it, ie, add a new static method to `StormWrapperSetupHelper` that 
does it, and just call it here.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723414#comment-14723414
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38312802
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -275,7 +275,14 @@
 * Path to Hadoop configuration
 */
public static final String PATH_HADOOP_CONFIG = "fs.hdfs.hadoopconf";
-   
+
+   //  Storm Configuration 
+
+   /**
+* storm configuration
+*/
+   public static final String STORM_DEFAULT_CONFIG = "storm.config";
--- End diff --

I think we should put this somewhere else... Maybe adding a new class in 
`stormcompatibility.util` package. @StephanEwen what is your opinion?


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723400#comment-14723400
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38311822
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
 ---
@@ -97,9 +106,42 @@ public AbstractStormSpoutWrapper(final IRichSpout spout,
}
 
@Override
+   public void open(Configuration parameters) throws Exception {
+   stormConf = new HashMap();
+
+   /* parameters is task configuration, we can get storm 
configuration only from job configuration */
+   RuntimeContext ctx = super.getRuntimeContext();
+   if (ctx instanceof StreamingRuntimeContext) {
+   Configuration jobConfiguration = 
((StreamingRuntimeContext) ctx).getJobConfiguration();
+
--- End diff --

Why do you no use the function input parameter `open(Configuration 
parameters)`? Is it different?


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723411#comment-14723411
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38312505
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
 ---
@@ -35,6 +35,10 @@
private String line;
private boolean newLineRead;
 
+   public FiniteStormFileSpout() {
+   super();
--- End diff --

no necessary. is called automatically. make a single line `public 
FiniteStormFileSpout() {}`


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723405#comment-14723405
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r38312152
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
 ---
@@ -101,20 +143,23 @@ public String map(String value) throws Exception {
private static boolean fileOutput = false;
private static String textPath;
private static String outputPath;
+   private static int exclamationNum;
 
private static boolean parseParameters(final String[] args) {
 
if (args.length > 0) {
// parse input arguments
fileOutput = true;
-   if (args.length == 2) {
+   if (args.length == 3) {
textPath = args[0];
outputPath = args[1];
+   exclamationNum = Integer.parseInt(args[2]);
} else {
System.err.println("Usage: 
ExclamationWithStormBolt  ");
--- End diff --

extend parameter list


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14721918#comment-14721918
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user ffbin commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-136241503
  
@mjsax @StephanEwen I have finish the code changes.
1.serialize Storm Config as a byte[] into the Flink configuration
2.extend ExclamationTopology such that the number of added !in 
ExclamationBolt and ExclamationWithStormSpout.ExclamationMap is configurable 
and adapt the tests.
3.extend FiniteStormFileSpout and base class with an empty constructor and 
configure the file to be opened via Storm configuration Map.
I have run flink-storm-compatibility test successfully in local machine and 
do not know why CI failed.
Can you have a look at my code? Thank you very much.


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: Storm Compatibility
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14716575#comment-14716575
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-135406207
  
Sorry, but I don't understand your question...


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14716603#comment-14716603
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-135411850
  
Can you try to serialize the whole `Map` into a single `byte[]`?


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14716605#comment-14716605
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user ffbin commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-135412789
  
Oh. you are right. Serialize the whole Map into a single byte[] is 
better.Thanks.


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14716525#comment-14716525
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-135399187
  
I think the `byte[]`converting approach is the correct way to go. Storm 
config keys must not be `String`, thus the specific prefix trick cannot be 
applied.


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14716551#comment-14716551
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user ffbin commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-135403187
  
@StephanEwen Thansk.The key of storm config is object, so maybe the 
confData(HashMapString, Object) of Configuration is not enough.


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14716555#comment-14716555
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-135403521
  
@ffbin: Can you try if you can simply put the serialized Storm Config as a 
byte[] into the Flink configuration? You can the unpack it inside the storm 
code, when needed.


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14716571#comment-14716571
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user ffbin commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-135405543
  
@mjsax hi. I want to make the number of added '!' in ExclamationBolt and 
ExclamationWithStormSpout.ExclamationMap configurabled by prepare() / open() 
function.The number can be get from jobConfiguration.What is your suggestion? 
Thanks


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14714436#comment-14714436
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-135085374
  
Putting a nested stormConf into the configuration seems just wrong, 
sorry. Such a specific hack in a generic utility cannot yield maintainable code.

Why is that needed in the first place? Why not have a dedicated 
configuration object for storm?


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14713053#comment-14713053
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r37976623
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
 ---
@@ -52,6 +77,38 @@ public void testRunExecuteCancelInfinite() throws 
Exception {
}
 
@Test
+   public void testOpen() throws Exception {
+   final IRichSpout spout = mock(IRichSpout.class);
+   final StormSpoutWrapperTuple1Integer spoutWrapper = new 
StormSpoutWrapperTuple1Integer(spout);
+
+   Map stormConf = new HashMap();
+   stormConf.put(new String(path), new 
String(/home/user/file.txt));
+   stormConf.put(1, 1024);
+   Configuration jobConfiguration = new Configuration();
+   jobConfiguration.putStormConf(stormConf);
+   Environment env = new RuntimeEnvironment(new JobID(), new 
JobVertexID(), new ExecutionAttemptID(),
+   new String(), new String(), 1, 2, 
jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
+   mock(MemoryManager.class), 
mock(IOManager.class), mock(BroadcastVariableManager.class),
+   mock(AccumulatorRegistry.class), 
mock(InputSplitProvider.class), mock(Map.class),
+   new ResultPartitionWriter[1], new InputGate[1], 
mock(ActorGateway.class),
+   mock(TaskManagerRuntimeInfo.class));
+   StreamingRuntimeContext runtimeContext = new 
StreamingRuntimeContext(env, new ExecutionConfig(),
+   mock(KeySelector.class),
+   mock(StateHandleProvider.class), 
mock(Map.class));
+
+   spoutWrapper.setRuntimeContext(runtimeContext);
+   spoutWrapper.open(mock(Configuration.class));
+   final SourceFunction.SourceContext ctx = 
mock(SourceFunction.SourceContext.class);
+   spoutWrapper.cancel();
+   spoutWrapper.run(ctx);
+
+   Map mapExpect = new HashMap();
+   mapExpect.put(new String(path), new 
String(/home/user/file.txt));
+   mapExpect.put(1, 1024);
+   verify(spout).open(eq(mapExpect), any(TopologyContext.class), 
any(SpoutOutputCollector.class));
--- End diff --

Use `stormConf` instead of `mapExpect`


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14713050#comment-14713050
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r37976536
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
 ---
@@ -222,6 +240,36 @@ public void testOpenSink() throws Exception {
 
@SuppressWarnings(unchecked)
@Test
+   public void testOpenWithStormConf() throws Exception {
+   final IRichBolt bolt = mock(IRichBolt.class);
+   final StormBoltWrapperObject, Object wrapper = new 
StormBoltWrapperObject, Object(bolt);
+
+   Map stormConf = new HashMap();
+   stormConf.put(new String(path), new 
String(/home/user/file.txt));
+   stormConf.put(1, 1024);
+   Configuration jobConfiguration = new Configuration();
+   jobConfiguration.putStormConf(stormConf);
+   Environment env = new RuntimeEnvironment(new JobID(), new 
JobVertexID(), new ExecutionAttemptID(),
+   new String(), new String(), 1, 2, 
jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
+   mock(MemoryManager.class), 
mock(IOManager.class), mock(BroadcastVariableManager.class),
+   mock(AccumulatorRegistry.class), 
mock(InputSplitProvider.class), mock(Map.class),
+   new ResultPartitionWriter[1], new InputGate[1], 
mock(ActorGateway.class),
+   mock(TaskManagerRuntimeInfo.class));
+   StreamingRuntimeContext ctx = new StreamingRuntimeContext(env, 
new ExecutionConfig(),
+   mock(KeySelector.class),
+   mock(StateHandleProvider.class), 
mock(Map.class));
+
+   wrapper.setup(mock(Output.class), ctx);
+   wrapper.open(mock(Configuration.class));
+
+   Map mapExpect = new HashMap();
+   mapExpect.put(new String(path), new 
String(/home/user/file.txt));
+   mapExpect.put(1, 1024);
+   verify(bolt).prepare(eq(mapExpect), any(TopologyContext.class), 
any(OutputCollector.class));
--- End diff --

Remove `mapExpected` and use `stormConf` from above to avoid code 
duplication.


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14713352#comment-14713352
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-135006223
  
@ffbin Can you extend `ExclamationTopology` such that the number of added 
`! `in `ExclamationBolt` and `ExclamationWithStormSpout.ExclamationMap` is 
configurable and adapt the tests accordingly. Please add an additional user 
parameter to `ExclamationWithStormSpout` and `ExclamationWithStormBolt`. 
Furhtermore, please **extend** `FiniteStormFileSpout` or base class with an 
empty constructor and configure the file to be opened via Storm configuration 
Map for this case.


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14713070#comment-14713070
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-135000384
  
Can anyone have a look at `Configuration.java`. Not sure if the changes are 
ok.


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709154#comment-14709154
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user ffbin commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r37744576
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
 ---
@@ -97,9 +103,21 @@ public AbstractStormSpoutWrapper(final IRichSpout spout,
}
 
@Override
+   public void open(Configuration parameters) throws Exception {
+   /* parameters is task configuration, we can get storm 
configuration only from job configuration */
+   RuntimeContext ctx = super.getRuntimeContext();
+   if (ctx instanceof StreamingRuntimeContext)
+   {
+   Configuration jobConfiguration = 
((StreamingRuntimeContext) ctx).getJobConfiguration();
+   stormConf = new HashMapString, Object();
+   stormConf.putAll(jobConfiguration.getConfData());
--- End diff --

I will add null test.In fact, jobConfiguration can not be null. Because 
StromBoltWrapper unit test mock StreamingRuntimeContext object, so its 
jobConfiguration  can be null.


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709169#comment-14709169
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user ffbin commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r37745377
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
 ---
@@ -41,8 +43,49 @@ public void submitTopology(final String topologyName, 
final Map?, ? conf, fina
 
public void submitTopologyWithOpts(final String topologyName, final 
Map?, ? conf, final FlinkTopology topology,
final SubmitOptions submitOpts) throws Exception {
-   ClusterUtil
-   
.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), 
topology.getNumberOfTasks());
+   JobGraph jobGraph = 
topology.getStreamGraph().getJobGraph(topologyName);
+   Configuration jobConfiguration = jobGraph.getJobConfiguration();
+
+   /* storm conf type must be MapString, Object */
+   MapString, Object stormConf = (MapString, Object)conf;
--- End diff --

The configuration in Storm is public class Config extends HashMapString, 
Object.It extends HashMapString, Object,if i use untyped Map, maybe it is 
hard to convert it into Storm Config.


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709194#comment-14709194
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r37746515
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
 ---
@@ -41,8 +43,49 @@ public void submitTopology(final String topologyName, 
final Map?, ? conf, fina
 
public void submitTopologyWithOpts(final String topologyName, final 
Map?, ? conf, final FlinkTopology topology,
final SubmitOptions submitOpts) throws Exception {
-   ClusterUtil
-   
.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), 
topology.getNumberOfTasks());
+   JobGraph jobGraph = 
topology.getStreamGraph().getJobGraph(topologyName);
+   Configuration jobConfiguration = jobGraph.getJobConfiguration();
+
+   /* storm conf type must be MapString, Object */
+   MapString, Object stormConf = (MapString, Object)conf;
--- End diff --

The definition of `Config` in not important here. Storm uses raw map as 
parameter in `StormSubmitter.submitTopology(...)`, `Spout.open(...)`, and 
`Bolt.prepare(...)`. Thus, it would be possible to use a different type than 
`String` for key values.


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709281#comment-14709281
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user ffbin commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r37750149
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
 ---
@@ -41,8 +43,49 @@ public void submitTopology(final String topologyName, 
final Map?, ? conf, fina
 
public void submitTopologyWithOpts(final String topologyName, final 
Map?, ? conf, final FlinkTopology topology,
final SubmitOptions submitOpts) throws Exception {
-   ClusterUtil
-   
.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), 
topology.getNumberOfTasks());
+   JobGraph jobGraph = 
topology.getStreamGraph().getJobGraph(topologyName);
+   Configuration jobConfiguration = jobGraph.getJobConfiguration();
+
+   /* storm conf type must be MapString, Object */
+   MapString, Object stormConf = (MapString, Object)conf;
--- End diff --

Thanks.I will change this and maybe the Configuration should add a new map, 
HashMapString, Object confData is not enough.


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709180#comment-14709180
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user ffbin commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r37745758
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
 ---
@@ -222,6 +239,31 @@ public void testOpenSink() throws Exception {
 
@SuppressWarnings(unchecked)
@Test
+   public void testOpenWithStormConf() throws Exception {
+   final IRichBolt bolt = mock(IRichBolt.class);
+   final StormBoltWrapperObject, Object wrapper = new 
StormBoltWrapperObject, Object(bolt);
+
+   Configuration jobConfiguration = new Configuration();
+   jobConfiguration.setString(new String(path), new 
String(/home/user/file.txt));
+   jobConfiguration.setInteger(new String(delimitSize), 1024);
+   Environment env = new RuntimeEnvironment(new JobID(), new 
JobVertexID(), new ExecutionAttemptID(),
+   new String(), new String(), 1, 2, 
jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
+   mock(MemoryManager.class), 
mock(IOManager.class), mock(BroadcastVariableManager.class),
+   mock(AccumulatorRegistry.class), 
mock(InputSplitProvider.class), mock(Map.class),
+   new ResultPartitionWriter[1], new InputGate[1], 
mock(ActorGateway.class),
+   mock(TaskManagerRuntimeInfo.class));
+   StreamingRuntimeContext ctx = new StreamingRuntimeContext(env, 
new ExecutionConfig(),
+   mock(KeySelector.class),
+   mock(StateHandleProvider.class), 
mock(Map.class));
+
+   wrapper.setup(mock(Output.class), ctx);
+   wrapper.open(mock(Configuration.class));
+
+   verify(bolt).prepare(any(Map.class), 
any(TopologyContext.class), any(OutputCollector.class));
--- End diff --

Thanks.I will check the map.


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709207#comment-14709207
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user ffbin commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-134184818
  
@mjsax Thank you very much.I miss the change in FlinkClient.I will fix it 
and test via bin/start-local.sh.In china, now we can not see the CI details and 
it is hard to know why CI failed.Thank you for your reminder.


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709226#comment-14709226
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-134190885
  
It fails in two test. You should actually see it, if you execute test 
locally. You should run test each time before you open/update an PR (at least 
for the module you did changes).

```
Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.751 sec 
 FAILURE! - in org.apache.flink.stormcompatibility.split.BoltSplitITCase
testTopology(org.apache.flink.stormcompatibility.split.BoltSplitITCase) 
Time elapsed: 0.635 sec  ERROR!
java.lang.NullPointerException: null
at 
org.apache.flink.stormcompatibility.api.FlinkLocalCluster.submitTopologyWithOpts(FlinkLocalCluster.java:52)
at 
org.apache.flink.stormcompatibility.api.FlinkLocalCluster.submitTopology(FlinkLocalCluster.java:41)
at 
org.apache.flink.stormcompatibility.split.StormSplitStreamBoltLocal.main(StormSplitStreamBoltLocal.java:42)
at 
org.apache.flink.stormcompatibility.split.BoltSplitITCase.testTopology(BoltSplitITCase.java:25)
```
and
```
Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.704 sec 
 FAILURE! - in org.apache.flink.stormcompatibility.split.SpoutSplitITCase
testTopology(org.apache.flink.stormcompatibility.split.SpoutSplitITCase) 
Time elapsed: 0.584 sec  ERROR!
java.lang.NullPointerException: null
at 
org.apache.flink.stormcompatibility.api.FlinkLocalCluster.submitTopologyWithOpts(FlinkLocalCluster.java:52)
at 
org.apache.flink.stormcompatibility.api.FlinkLocalCluster.submitTopology(FlinkLocalCluster.java:41)
at 
org.apache.flink.stormcompatibility.split.StormSplitStreamSpoutLocal.main(StormSplitStreamSpoutLocal.java:42)
at 
org.apache.flink.stormcompatibility.split.SpoutSplitITCase.testTopology(SpoutSplitITCase.java:25)
```

Just out of curiosity: why can you not see Travis details? 


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709261#comment-14709261
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user ffbin commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-134198533
  
@mjsax .The reason why i can not see Travis details is that(from reply 
mail):
The problem is that our CDN is currently blocked in mainland China. I'm 
talking to our CDN provider right now for getting a custom SSL certificate and 
domain set up, so we should be usable from China within the next weeks 
hopefully.

I will fix the code.I only run the test of core, and miss the test in 
example.It is my fault.Thanks!




 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709176#comment-14709176
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r37745589
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
 ---
@@ -97,9 +103,21 @@ public AbstractStormSpoutWrapper(final IRichSpout spout,
}
 
@Override
+   public void open(Configuration parameters) throws Exception {
+   /* parameters is task configuration, we can get storm 
configuration only from job configuration */
+   RuntimeContext ctx = super.getRuntimeContext();
+   if (ctx instanceof StreamingRuntimeContext)
+   {
+   Configuration jobConfiguration = 
((StreamingRuntimeContext) ctx).getJobConfiguration();
+   stormConf = new HashMapString, Object();
+   stormConf.putAll(jobConfiguration.getConfData());
--- End diff --

If it cannot be `null` in test, does not mean it cannot be `null` in real 
cluster deployment...


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709189#comment-14709189
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r37746137
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
 ---
@@ -41,8 +43,49 @@ public void submitTopology(final String topologyName, 
final Map?, ? conf, fina
 
public void submitTopologyWithOpts(final String topologyName, final 
Map?, ? conf, final FlinkTopology topology,
final SubmitOptions submitOpts) throws Exception {
-   ClusterUtil
-   
.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), 
topology.getNumberOfTasks());
+   JobGraph jobGraph = 
topology.getStreamGraph().getJobGraph(topologyName);
+   Configuration jobConfiguration = jobGraph.getJobConfiguration();
+
+   /* storm conf type must be MapString, Object */
+   MapString, Object stormConf = (MapString, Object)conf;
--- End diff --

Your comment is not rendered correctly. Read Markdown supported. For 
inline source code use single tick at the beginning and end. You meant `public 
class Config extends HashMapString, Object` but `String,Object` is not 
shown above


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709179#comment-14709179
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r37745701
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
 ---
@@ -41,8 +43,49 @@ public void submitTopology(final String topologyName, 
final Map?, ? conf, fina
 
public void submitTopologyWithOpts(final String topologyName, final 
Map?, ? conf, final FlinkTopology topology,
final SubmitOptions submitOpts) throws Exception {
-   ClusterUtil
-   
.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), 
topology.getNumberOfTasks());
+   JobGraph jobGraph = 
topology.getStreamGraph().getJobGraph(topologyName);
+   Configuration jobConfiguration = jobGraph.getJobConfiguration();
+
+   /* storm conf type must be MapString, Object */
+   MapString, Object stormConf = (MapString, Object)conf;
--- End diff --

It is not about `HashMap` vs `Map`. It's about the generic types. You 
should not use `MapString,Object` but raw type `Map`.


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14708995#comment-14708995
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

GitHub user ffbin opened a pull request:

https://github.com/apache/flink/pull/1046

[FLINK-2525]Add configuration support in Storm-compatibility

- enable config can used in Spouts.open() and Bout.prepare().

Example like this:
public static void main(final String[] args) {
String topologyId = Streaming WordCount;
final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
...
final Config conf = new Config();
conf.put(wordsFile, /home/user/);
conf.put(delimitSize, 1024);
final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology(topologyId, conf, builder.createTopology());
Utils.sleep(10 * 1000);
cluster.killTopology(topologyId);
cluster.shutdown(); 
}

public class WordReader implements IRichSpout {

public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
try {
this.context = context;
this.fileReader = new FileReader(conf.get(wordsFile));
} catch (FileNotFoundException e) {
throw new RuntimeException(Error reading file 
[+conf.get(wordFile)+]);
}
this.collector = collector;
}
}

public final class StormBoltTokenizer implements IRichBolt {

public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
this.delimitSize = stormConf.get(delimitSize);
this.collector = collector;
}   
}


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ffbin/flink FLINK-2525

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1046.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1046


commit c6aebc10b7a010cc9cd5fb5b6505fdbc942ab7b9
Author: ffbin 869218...@qq.com
Date:   2015-08-24T09:07:26Z

[FLINK-2525]Add configuration support in Storm-compatibility




 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709106#comment-14709106
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r37741197
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
 ---
@@ -97,9 +103,21 @@ public AbstractStormSpoutWrapper(final IRichSpout spout,
}
 
@Override
+   public void open(Configuration parameters) throws Exception {
+   /* parameters is task configuration, we can get storm 
configuration only from job configuration */
+   RuntimeContext ctx = super.getRuntimeContext();
+   if (ctx instanceof StreamingRuntimeContext)
+   {
+   Configuration jobConfiguration = 
((StreamingRuntimeContext) ctx).getJobConfiguration();
+   stormConf = new HashMapString, Object();
+   stormConf.putAll(jobConfiguration.getConfData());
--- End diff --

Missing `null` test on `jobConfiguration` (or is this test not necessary -- 
it is done in StromBoltWrapper)


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709108#comment-14709108
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r37741352
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
 ---
@@ -222,6 +239,31 @@ public void testOpenSink() throws Exception {
 
@SuppressWarnings(unchecked)
@Test
+   public void testOpenWithStormConf() throws Exception {
+   final IRichBolt bolt = mock(IRichBolt.class);
+   final StormBoltWrapperObject, Object wrapper = new 
StormBoltWrapperObject, Object(bolt);
+
+   Configuration jobConfiguration = new Configuration();
+   jobConfiguration.setString(new String(path), new 
String(/home/user/file.txt));
+   jobConfiguration.setInteger(new String(delimitSize), 1024);
+   Environment env = new RuntimeEnvironment(new JobID(), new 
JobVertexID(), new ExecutionAttemptID(),
+   new String(), new String(), 1, 2, 
jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
+   mock(MemoryManager.class), 
mock(IOManager.class), mock(BroadcastVariableManager.class),
+   mock(AccumulatorRegistry.class), 
mock(InputSplitProvider.class), mock(Map.class),
+   new ResultPartitionWriter[1], new InputGate[1], 
mock(ActorGateway.class),
+   mock(TaskManagerRuntimeInfo.class));
+   StreamingRuntimeContext ctx = new StreamingRuntimeContext(env, 
new ExecutionConfig(),
+   mock(KeySelector.class),
+   mock(StateHandleProvider.class), 
mock(Map.class));
+
+   wrapper.setup(mock(Output.class), ctx);
+   wrapper.open(mock(Configuration.class));
+
+   verify(bolt).prepare(any(Map.class), 
any(TopologyContext.class), any(OutputCollector.class));
--- End diff --

You should not check for `any(Map.class)` but check if the map contains the 
value you set above in `jobConfiguration`


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709107#comment-14709107
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r37741303
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
 ---
@@ -222,6 +239,31 @@ public void testOpenSink() throws Exception {
 
@SuppressWarnings(unchecked)
@Test
+   public void testOpenWithStormConf() throws Exception {
+   final IRichBolt bolt = mock(IRichBolt.class);
+   final StormBoltWrapperObject, Object wrapper = new 
StormBoltWrapperObject, Object(bolt);
+
+   Configuration jobConfiguration = new Configuration();
+   jobConfiguration.setString(new String(path), new 
String(/home/user/file.txt));
+   jobConfiguration.setInteger(new String(delimitSize), 1024);
+   Environment env = new RuntimeEnvironment(new JobID(), new 
JobVertexID(), new ExecutionAttemptID(),
+   new String(), new String(), 1, 2, 
jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
+   mock(MemoryManager.class), 
mock(IOManager.class), mock(BroadcastVariableManager.class),
+   mock(AccumulatorRegistry.class), 
mock(InputSplitProvider.class), mock(Map.class),
+   new ResultPartitionWriter[1], new InputGate[1], 
mock(ActorGateway.class),
+   mock(TaskManagerRuntimeInfo.class));
+   StreamingRuntimeContext ctx = new StreamingRuntimeContext(env, 
new ExecutionConfig(),
+   mock(KeySelector.class),
+   mock(StateHandleProvider.class), 
mock(Map.class));
+
+   wrapper.setup(mock(Output.class), ctx);
+   wrapper.open(mock(Configuration.class));
--- End diff --

Why do you mock here and not use `jobConfiguration`?


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709109#comment-14709109
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r37741464
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
 ---
@@ -52,6 +75,32 @@ public void testRunExecuteCancelInfinite() throws 
Exception {
}
 
@Test
+   public void testOpen() throws Exception {
+   final IRichSpout spout = mock(IRichSpout.class);
+   final StormSpoutWrapperTuple1Integer spoutWrapper = new 
StormSpoutWrapperTuple1Integer(spout);
+
+   Configuration jobConfiguration = new Configuration();
+   jobConfiguration.setString(new String(path), new 
String(/home/user/file.txt));
+   jobConfiguration.setInteger(new String(delimitSize), 1024);
+   Environment env = new RuntimeEnvironment(new JobID(), new 
JobVertexID(), new ExecutionAttemptID(),
+   new String(), new String(), 1, 2, 
jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
+   mock(MemoryManager.class), 
mock(IOManager.class), mock(BroadcastVariableManager.class),
+   mock(AccumulatorRegistry.class), 
mock(InputSplitProvider.class), mock(Map.class),
+   new ResultPartitionWriter[1], new InputGate[1], 
mock(ActorGateway.class),
+   mock(TaskManagerRuntimeInfo.class));
+   StreamingRuntimeContext runtimeContext = new 
StreamingRuntimeContext(env, new ExecutionConfig(),
+   mock(KeySelector.class),
+   mock(StateHandleProvider.class), 
mock(Map.class));
+
+   spoutWrapper.setRuntimeContext(runtimeContext);
+   spoutWrapper.open(mock(Configuration.class));
+   final SourceFunction.SourceContext ctx = 
mock(SourceFunction.SourceContext.class);
+   spoutWrapper.cancel();
+   spoutWrapper.run(ctx);
+   verify(spout).open(any(Map.class), any(TopologyContext.class), 
any(SpoutOutputCollector.class));
+   }
+
--- End diff --

Some comment as in `StormBoltWrapperTest`


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709113#comment-14709113
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1046#discussion_r37741748
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
 ---
@@ -41,8 +43,49 @@ public void submitTopology(final String topologyName, 
final Map?, ? conf, fina
 
public void submitTopologyWithOpts(final String topologyName, final 
Map?, ? conf, final FlinkTopology topology,
final SubmitOptions submitOpts) throws Exception {
-   ClusterUtil
-   
.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), 
topology.getNumberOfTasks());
+   JobGraph jobGraph = 
topology.getStreamGraph().getJobGraph(topologyName);
+   Configuration jobConfiguration = jobGraph.getJobConfiguration();
+
+   /* storm conf type must be MapString, Object */
+   MapString, Object stormConf = (MapString, Object)conf;
--- End diff --

The configuration in Strom is an untyped `Map`. Casting it to `MapString, 
Object` might fail. Flink should not limit the Map in this way.


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709125#comment-14709125
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-134162565
  
I don't see changes in `FlinkClient`. Only in `FlinkLocalCluster`. Did you 
test by starting Flink via `bin/start-local.sh` (it would be even better to 
test in a real cluster)?


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14709131#comment-14709131
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-134162997
  
Travis fails because you broke something...


 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)