Hi,

I can submit the topology without any problems. Your code is fine.

If your program "exits silently" I would actually assume, that you
submitted the topology successfully. Can you see the topology in
JobManager WebFrontend? If not, do you see any errors in the log files?

-Matthias

On 01/14/2016 07:37 AM, Shinhyung Yang wrote:
> Dear Matthias,
> 
> Thank you for the reply! I am so sorry to respond late on the matter.
> 
>> I just double checked the Flink code and during translation from Storm
>> to Flink declareOuputFields() is called twice. You are right that is
>> does the same job twice, but that is actually not a problem. The Flink
>> code is cleaner this way to I guess we will not change it.
> 
> Thank you for checking. I don't think it contributed any part of my
> current problem anyways. For my case though, it is called 3 times if
> the number is important at all.
> 
>> About lifecyle:
>> If you submit your code, during deployment, Spout.open() and
>> Bolt.prepare() should be called for each parallel instance on each
>> Spout/Bolt of your topology.
>>
>> About your submission (I guess this should solve your current problem):
>> If you use bin/start-local.sh, you should *not* use FlinkLocalCluster,
>> but FlinkSubmitter. You have to distinguish three cases:
>>
>>   - local/debug/IDE mode: use FlinkLocalCluster
>>     => you do not need to start any Flink cluster before --
>> FlinkLocalCluster is started up in you current JVM
>>     * the purpose is local debugging in an IDE (this allows to easily
>> set break points and debug code)
>>
>>   - pseudo-distributed mode: use FlinkSubmitter
>>     => you start up a local Flink cluster via bin/start-local.sh
>>     * this local Flink cluster run in an own JVM and looks like a real
>> cluster to the Flink client, ie, "bin/flink run"
>>     * thus, you just use FlinkSubmitter as for a real cluster (with
>> JobManager/Nimbus hostname "localhost")
>>     * in contrast to FlinkLocalCluster, no "internal Flink Cluster" is
>> started in your current JVM, but your code is shipped to the local
>> cluster you started up beforehand via bin/start-local.sh and executed in
>> this JVM
>>
>>   - distributed mode: use FlinkSubmitter
>>     => you start up Flink in a real cluster using bin/start-cluster.sh
>>     * you use "bin/flink run" to submit your code to the real cluster
> 
> Thank you for the explanation, now I have clearer understanding of
> clusters and submitters. However my problem is not fixed yet. Here's
> my code:
> 
> ////////////////////////////////////////////////////////////////////////////////
> // ./src/main/java/myexample/App.java
> ////////////////////////////////////////////////////////////////////////////////
> 
> package myexample;
> 
> import backtype.storm.Config;
> import backtype.storm.LocalCluster;
> import myexample.spout.StandaloneSpout;
> import backtype.storm.generated.StormTopology;
> import backtype.storm.topology.IRichSpout;
> import backtype.storm.topology.TopologyBuilder;
> import backtype.storm.topology.base.BaseBasicBolt;
> 
> import myexample.bolt.Node;
> import myexample.bolt.StandardBolt;
> 
> import java.util.Arrays;
> import java.util.List;
> 
> //
> import org.apache.flink.storm.api.FlinkTopology;
> //import org.apache.flink.storm.api.FlinkLocalCluster;
> import org.apache.flink.storm.api.FlinkSubmitter;
> //import org.apache.flink.storm.api.FlinkClient;
> import org.apache.flink.storm.api.FlinkTopologyBuilder;
> 
> public class App
> {
>     public static void main( String[] args ) throws Exception
>     {
>         int layer = 0;
>         StandaloneSpout spout = new StandaloneSpout();
>         Config conf = new Config();
>         conf.put(Config.TOPOLOGY_DEBUG, false);
>         //FlinkLocalCluster cluster = new FlinkLocalCluster();
>         //FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
>         //LocalCluster cluster = new LocalCluster();
> 
>         layer = Integer.parseInt(args[0]);
>         //cluster.submitTopology("topology", conf,
> BinaryTopology(spout, layer));
>         FlinkSubmitter.submitTopology("topology", conf,
> BinaryTopology(spout, layer));
>         //Thread.sleep(5 * 1000);
>         //FlinkClient.getConfiguredClient(conf).killTopology("topology");
>         //cluster.killTopology("topology");
>         //cluster.shutdown();
>     }
> 
>     public static FlinkTopology BinaryTopology(IRichSpout input, int n) {
>     //public static StormTopology BinaryTopology(IRichSpout input, int n) {
>         return BinaryTopology(input, n,
> Arrays.asList((BaseBasicBolt)new StandardBolt()));
>     }
> 
>     public static FlinkTopology BinaryTopology(IRichSpout input, int
> n, List<BaseBasicBolt> boltList) {
>     //public static StormTopology BinaryTopology(IRichSpout input, int
> n, List<BaseBasicBolt> boltList) {
>         FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
>         //TopologyBuilder builder = new TopologyBuilder();
>         String sourceId = "src";
>         builder.setSpout(sourceId, input);
> 
> 
>         String boltId = "bolt";
>         builder.setBolt(boltId, new Node(), Math.pow(2,
> n)).shuffleGrouping(sourceId);
> 
>         return builder.createTopology();
>     }
> }
> 
> ////////////////////////////////////////////////////////////////////////////////
> // ./src/main/java/myexample/spout/StandaloneSpout.java
> ////////////////////////////////////////////////////////////////////////////////
> 
> package myexample.spout;
> 
> import backtype.storm.spout.SpoutOutputCollector;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseRichSpout;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Values;
> 
> import java.io.*;
> import java.text.DateFormat;
> import java.text.SimpleDateFormat;
> import java.util.*;
> 
> public class StandaloneSpout extends BaseRichSpout {
> 
>     private SpoutOutputCollector mCollector;
> 
>     @Override
>     public void open(Map conf, TopologyContext context,
> SpoutOutputCollector collector) {
>         this.mCollector = collector;
>     }
> 
>     @Override
>     public void nextTuple() {
>         long currentTime = System.currentTimeMillis();
> 
>         // TODO: Currently, do not check bound of list, because of
> experiment.(Avoid branch)
>         mCollector.emit(new Values(new String("aaa"),
> System.currentTimeMillis(), 0));
> 
>     }
> 
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>         System.out.println("declareOutputFields");
>         declarer.declare(new Fields("string1", "timestamp", "omitted"));
>     }
> }
> 
> ////////////////////////////////////////////////////////////////////////////////
> // ./src/main/java/myexample/bolt/Node.java
> ////////////////////////////////////////////////////////////////////////////////
> 
> package myexample.bolt;
> 
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.BasicOutputCollector;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseBasicBolt;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Tuple;
> import backtype.storm.tuple.Values;
> import java.util.Map;
> 
> public class Node extends BaseBasicBolt {
> 
>     public static boolean isTupleEmpty(Tuple tuple) {
>       return false;
>     }
> 
>     @Override
>     public void prepare(Map stormConf, TopologyContext context) {
>         super.prepare(stormConf, context);
>     }
> 
>     @Override
>     public void cleanup() {
>         super.cleanup();
>     }
> 
>     @Override
>     public void execute(Tuple tuple, BasicOutputCollector collector) {
>         collector.emit(new Values("aaa", 1, System.currentTimeMillis(), 0));
>     }
> 
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>         declarer.declare(new Fields("string1", "string2", "timestamp",
> "omitted"));
>     }
> }
> 
> ////////////////////////////////////////////////////////////////////////////////
> // ./src/main/java/myexample/bolt/StandardBolt.java
> ////////////////////////////////////////////////////////////////////////////////
> 
> package myexample.bolt;
> 
> import java.util.Map;
> 
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.BasicOutputCollector;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseBasicBolt;
> import backtype.storm.tuple.Tuple;
> 
> public class StandardBolt extends BaseBasicBolt {
> 
>     @Override
>     public void prepare(Map stormConf, TopologyContext context) {
>         super.prepare(stormConf, context);
>     }
> 
>     @Override
>     public void execute(Tuple tuple, BasicOutputCollector collector) {
>     }
> 
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer ofd) {
>     }
> }
> 
> Probably it is the source code which has the problem or other things
> around the project environment might contain the problem. I would
> really appreciate if you could verify whether the code looks ok or
> not.
> 
>>
>> About further debugging: you can increase the log level to get more
>> information:
>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/logging.html
> 
> I tried to inject `log4j.properties' file that I got from a sample
> flink-quickstart-java application created from `mvn
> archetype:generate' to a ./target/*.jar but it does not work. I tried
> this because placing that `log4j.properties' file under
> ./src/main/resources of my project did not work in the first place.
> 
> Thank you again for your help.
> With best regards,
> Shinhyung
> 
>> Hope this helps!
>>
>> -Matthias
>>
>> On 01/09/2016 04:38 PM, Shinhyung Yang wrote:
>>> Dear Matthias,
>>>
>>> Thank you for replying!
>>>
>>>     that sounds weird and should not happen -- Spout.open() should get
>>>     called exactly once.
>>>
>>>
>>> That's what I thought too. I'm new to both Storm and Flink so it's quite
>>> complicated for me to handle both yet; would it be helpful for me if I
>>> know storm's lifecyle and flink 's lifecycle? When submitTopology()
>>> invoked, what should be called other than spout.open()?
>>>
>>>     I am not sure about multiple calls to
>>>
>>>     declareOuputFields though -- if might be called multiple times -- would
>>>     need to double check the code.
>>>
>>>
>>> I'll check my code too.
>>>
>>>
>>>     However, the call to declareOuputFields should be idempotent, so it
>>>     should actually not be a problem if it is called multiple times. Even if
>>>     Storm might call this method only once, there is no guarantee that it is
>>>     not called multiple time. If this is a problem for you, please let me
>>>     know. I think we could fix this and make sure the method is only called
>>>     once.
>>>
>>>
>>> Actually it doesn't seem to be a problem for now. It just does the same
>>> job multiple times.
>>>
>>>
>>>     It would be helpful if you could share you code. What do you mean with
>>>     "exits silently"? No submission happens? Did you check the logs? As you
>>>     mentioned FlinkLocalCluster, I assume that you run within an IDE?
>>>
>>>
>>> The topology doesn't seem to continue. There's a set of initialization
>>> code in the open method of the program's spout and it looks hopeless if
>>> it's not invoked. Is there any way to check the logs other than using
>>> println() calls? I'm running it on the commandline with having
>>> `bin/start_local.sh' running in the background and `bin/flink run'.
>>>
>>>
>>>     Btw: lately we fixed a couple of bugs. I would suggest that you use the
>>>     latest version from Flink master branch. I should work with 0.10.1
>>>     without problems.
>>>
>>>
>>> It was vey tedious for me to deal with a pom.xml file and .m2
>>> repository. So I preferred to use maven central. But I should try with
>>> the master branch if I have to.
>>>
>>> I will quickly check if I could share some of the code.
>>>
>>> Thank you again for the help!
>>> With best regards,
>>> Shinhyung Yang
>>>
>>>
>>>
>>>     -Matthias
>>>
>>>
>>>
>>>     On 01/09/2016 01:27 AM, Shinhyung Yang wrote:
>>>     > Howdies to everyone,
>>>     >
>>>     > I'm trying to use the storm compatibility layer on Flink 0.10.1. The
>>>     > original storm topology works fine on Storm 0.9.5 and I have
>>>     > incorporated FlinkLocalCluster, FlinkTopologyBuilder, and
>>>     > FlinkTopology classes according to the programming guide
>>>     >
>>>     
>>> (https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/storm_compatibility.html).
>>>     > I'm running it on Oracle Java 8 (1.8.0_66-b17) on Centos 7 (7.2.1511).
>>>     > What happens is, it seems to be going all the way to submitTopology
>>>     > method without any problem, however it doesn't invoke open method of
>>>     > Spout class but declareOutputFields method is called for multiple
>>>     > times and the program exits silently. Do you guys have any idea what's
>>>     > going on here or have any suggestions? If needed, then please ask me
>>>     > for more information.
>>>     >
>>>     > Thank you for reading.
>>>     > With best regards,
>>>     > Shinhyung Yang
>>>     >
>>>
>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to