Hi, I can submit the topology without any problems. Your code is fine.
If your program "exits silently" I would actually assume, that you submitted the topology successfully. Can you see the topology in JobManager WebFrontend? If not, do you see any errors in the log files? -Matthias On 01/14/2016 07:37 AM, Shinhyung Yang wrote: > Dear Matthias, > > Thank you for the reply! I am so sorry to respond late on the matter. > >> I just double checked the Flink code and during translation from Storm >> to Flink declareOuputFields() is called twice. You are right that is >> does the same job twice, but that is actually not a problem. The Flink >> code is cleaner this way to I guess we will not change it. > > Thank you for checking. I don't think it contributed any part of my > current problem anyways. For my case though, it is called 3 times if > the number is important at all. > >> About lifecyle: >> If you submit your code, during deployment, Spout.open() and >> Bolt.prepare() should be called for each parallel instance on each >> Spout/Bolt of your topology. >> >> About your submission (I guess this should solve your current problem): >> If you use bin/start-local.sh, you should *not* use FlinkLocalCluster, >> but FlinkSubmitter. You have to distinguish three cases: >> >> - local/debug/IDE mode: use FlinkLocalCluster >> => you do not need to start any Flink cluster before -- >> FlinkLocalCluster is started up in you current JVM >> * the purpose is local debugging in an IDE (this allows to easily >> set break points and debug code) >> >> - pseudo-distributed mode: use FlinkSubmitter >> => you start up a local Flink cluster via bin/start-local.sh >> * this local Flink cluster run in an own JVM and looks like a real >> cluster to the Flink client, ie, "bin/flink run" >> * thus, you just use FlinkSubmitter as for a real cluster (with >> JobManager/Nimbus hostname "localhost") >> * in contrast to FlinkLocalCluster, no "internal Flink Cluster" is >> started in your current JVM, but your code is shipped to the local >> cluster you started up beforehand via bin/start-local.sh and executed in >> this JVM >> >> - distributed mode: use FlinkSubmitter >> => you start up Flink in a real cluster using bin/start-cluster.sh >> * you use "bin/flink run" to submit your code to the real cluster > > Thank you for the explanation, now I have clearer understanding of > clusters and submitters. However my problem is not fixed yet. Here's > my code: > > //////////////////////////////////////////////////////////////////////////////// > // ./src/main/java/myexample/App.java > //////////////////////////////////////////////////////////////////////////////// > > package myexample; > > import backtype.storm.Config; > import backtype.storm.LocalCluster; > import myexample.spout.StandaloneSpout; > import backtype.storm.generated.StormTopology; > import backtype.storm.topology.IRichSpout; > import backtype.storm.topology.TopologyBuilder; > import backtype.storm.topology.base.BaseBasicBolt; > > import myexample.bolt.Node; > import myexample.bolt.StandardBolt; > > import java.util.Arrays; > import java.util.List; > > // > import org.apache.flink.storm.api.FlinkTopology; > //import org.apache.flink.storm.api.FlinkLocalCluster; > import org.apache.flink.storm.api.FlinkSubmitter; > //import org.apache.flink.storm.api.FlinkClient; > import org.apache.flink.storm.api.FlinkTopologyBuilder; > > public class App > { > public static void main( String[] args ) throws Exception > { > int layer = 0; > StandaloneSpout spout = new StandaloneSpout(); > Config conf = new Config(); > conf.put(Config.TOPOLOGY_DEBUG, false); > //FlinkLocalCluster cluster = new FlinkLocalCluster(); > //FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); > //LocalCluster cluster = new LocalCluster(); > > layer = Integer.parseInt(args[0]); > //cluster.submitTopology("topology", conf, > BinaryTopology(spout, layer)); > FlinkSubmitter.submitTopology("topology", conf, > BinaryTopology(spout, layer)); > //Thread.sleep(5 * 1000); > //FlinkClient.getConfiguredClient(conf).killTopology("topology"); > //cluster.killTopology("topology"); > //cluster.shutdown(); > } > > public static FlinkTopology BinaryTopology(IRichSpout input, int n) { > //public static StormTopology BinaryTopology(IRichSpout input, int n) { > return BinaryTopology(input, n, > Arrays.asList((BaseBasicBolt)new StandardBolt())); > } > > public static FlinkTopology BinaryTopology(IRichSpout input, int > n, List<BaseBasicBolt> boltList) { > //public static StormTopology BinaryTopology(IRichSpout input, int > n, List<BaseBasicBolt> boltList) { > FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); > //TopologyBuilder builder = new TopologyBuilder(); > String sourceId = "src"; > builder.setSpout(sourceId, input); > > > String boltId = "bolt"; > builder.setBolt(boltId, new Node(), Math.pow(2, > n)).shuffleGrouping(sourceId); > > return builder.createTopology(); > } > } > > //////////////////////////////////////////////////////////////////////////////// > // ./src/main/java/myexample/spout/StandaloneSpout.java > //////////////////////////////////////////////////////////////////////////////// > > package myexample.spout; > > import backtype.storm.spout.SpoutOutputCollector; > import backtype.storm.task.TopologyContext; > import backtype.storm.topology.OutputFieldsDeclarer; > import backtype.storm.topology.base.BaseRichSpout; > import backtype.storm.tuple.Fields; > import backtype.storm.tuple.Values; > > import java.io.*; > import java.text.DateFormat; > import java.text.SimpleDateFormat; > import java.util.*; > > public class StandaloneSpout extends BaseRichSpout { > > private SpoutOutputCollector mCollector; > > @Override > public void open(Map conf, TopologyContext context, > SpoutOutputCollector collector) { > this.mCollector = collector; > } > > @Override > public void nextTuple() { > long currentTime = System.currentTimeMillis(); > > // TODO: Currently, do not check bound of list, because of > experiment.(Avoid branch) > mCollector.emit(new Values(new String("aaa"), > System.currentTimeMillis(), 0)); > > } > > @Override > public void declareOutputFields(OutputFieldsDeclarer declarer) { > System.out.println("declareOutputFields"); > declarer.declare(new Fields("string1", "timestamp", "omitted")); > } > } > > //////////////////////////////////////////////////////////////////////////////// > // ./src/main/java/myexample/bolt/Node.java > //////////////////////////////////////////////////////////////////////////////// > > package myexample.bolt; > > import backtype.storm.task.TopologyContext; > import backtype.storm.topology.BasicOutputCollector; > import backtype.storm.topology.OutputFieldsDeclarer; > import backtype.storm.topology.base.BaseBasicBolt; > import backtype.storm.tuple.Fields; > import backtype.storm.tuple.Tuple; > import backtype.storm.tuple.Values; > import java.util.Map; > > public class Node extends BaseBasicBolt { > > public static boolean isTupleEmpty(Tuple tuple) { > return false; > } > > @Override > public void prepare(Map stormConf, TopologyContext context) { > super.prepare(stormConf, context); > } > > @Override > public void cleanup() { > super.cleanup(); > } > > @Override > public void execute(Tuple tuple, BasicOutputCollector collector) { > collector.emit(new Values("aaa", 1, System.currentTimeMillis(), 0)); > } > > @Override > public void declareOutputFields(OutputFieldsDeclarer declarer) { > declarer.declare(new Fields("string1", "string2", "timestamp", > "omitted")); > } > } > > //////////////////////////////////////////////////////////////////////////////// > // ./src/main/java/myexample/bolt/StandardBolt.java > //////////////////////////////////////////////////////////////////////////////// > > package myexample.bolt; > > import java.util.Map; > > import backtype.storm.task.TopologyContext; > import backtype.storm.topology.BasicOutputCollector; > import backtype.storm.topology.OutputFieldsDeclarer; > import backtype.storm.topology.base.BaseBasicBolt; > import backtype.storm.tuple.Tuple; > > public class StandardBolt extends BaseBasicBolt { > > @Override > public void prepare(Map stormConf, TopologyContext context) { > super.prepare(stormConf, context); > } > > @Override > public void execute(Tuple tuple, BasicOutputCollector collector) { > } > > @Override > public void declareOutputFields(OutputFieldsDeclarer ofd) { > } > } > > Probably it is the source code which has the problem or other things > around the project environment might contain the problem. I would > really appreciate if you could verify whether the code looks ok or > not. > >> >> About further debugging: you can increase the log level to get more >> information: >> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/logging.html > > I tried to inject `log4j.properties' file that I got from a sample > flink-quickstart-java application created from `mvn > archetype:generate' to a ./target/*.jar but it does not work. I tried > this because placing that `log4j.properties' file under > ./src/main/resources of my project did not work in the first place. > > Thank you again for your help. > With best regards, > Shinhyung > >> Hope this helps! >> >> -Matthias >> >> On 01/09/2016 04:38 PM, Shinhyung Yang wrote: >>> Dear Matthias, >>> >>> Thank you for replying! >>> >>> that sounds weird and should not happen -- Spout.open() should get >>> called exactly once. >>> >>> >>> That's what I thought too. I'm new to both Storm and Flink so it's quite >>> complicated for me to handle both yet; would it be helpful for me if I >>> know storm's lifecyle and flink 's lifecycle? When submitTopology() >>> invoked, what should be called other than spout.open()? >>> >>> I am not sure about multiple calls to >>> >>> declareOuputFields though -- if might be called multiple times -- would >>> need to double check the code. >>> >>> >>> I'll check my code too. >>> >>> >>> However, the call to declareOuputFields should be idempotent, so it >>> should actually not be a problem if it is called multiple times. Even if >>> Storm might call this method only once, there is no guarantee that it is >>> not called multiple time. If this is a problem for you, please let me >>> know. I think we could fix this and make sure the method is only called >>> once. >>> >>> >>> Actually it doesn't seem to be a problem for now. It just does the same >>> job multiple times. >>> >>> >>> It would be helpful if you could share you code. What do you mean with >>> "exits silently"? No submission happens? Did you check the logs? As you >>> mentioned FlinkLocalCluster, I assume that you run within an IDE? >>> >>> >>> The topology doesn't seem to continue. There's a set of initialization >>> code in the open method of the program's spout and it looks hopeless if >>> it's not invoked. Is there any way to check the logs other than using >>> println() calls? I'm running it on the commandline with having >>> `bin/start_local.sh' running in the background and `bin/flink run'. >>> >>> >>> Btw: lately we fixed a couple of bugs. I would suggest that you use the >>> latest version from Flink master branch. I should work with 0.10.1 >>> without problems. >>> >>> >>> It was vey tedious for me to deal with a pom.xml file and .m2 >>> repository. So I preferred to use maven central. But I should try with >>> the master branch if I have to. >>> >>> I will quickly check if I could share some of the code. >>> >>> Thank you again for the help! >>> With best regards, >>> Shinhyung Yang >>> >>> >>> >>> -Matthias >>> >>> >>> >>> On 01/09/2016 01:27 AM, Shinhyung Yang wrote: >>> > Howdies to everyone, >>> > >>> > I'm trying to use the storm compatibility layer on Flink 0.10.1. The >>> > original storm topology works fine on Storm 0.9.5 and I have >>> > incorporated FlinkLocalCluster, FlinkTopologyBuilder, and >>> > FlinkTopology classes according to the programming guide >>> > >>> >>> (https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/storm_compatibility.html). >>> > I'm running it on Oracle Java 8 (1.8.0_66-b17) on Centos 7 (7.2.1511). >>> > What happens is, it seems to be going all the way to submitTopology >>> > method without any problem, however it doesn't invoke open method of >>> > Spout class but declareOutputFields method is called for multiple >>> > times and the program exits silently. Do you guys have any idea what's >>> > going on here or have any suggestions? If needed, then please ask me >>> > for more information. >>> > >>> > Thank you for reading. >>> > With best regards, >>> > Shinhyung Yang >>> > >>> >>
signature.asc
Description: OpenPGP digital signature