Hi Robert, let me know if you experience the same issue with 0.9.0.1. One thing that caught my eye was this:
2014-03-11 22:38:32 o.a.t.s.TNonblockingServer [ERROR] Read a frame size of 2064605, which is bigger than the maximum allowable buffer size for ALL connections. In 0.9.1 that can indicate that something other than the thrift client (like ssh, telnet, or a security scanner) is accessing nimbus’ thrift port. More details here (https://github.com/apache/incubator-storm/pull/3). The fact that you can run a different topology class from the same jar file suggests that it is not a problem with the upload process. Is it possible that something in your topology (like a cassandra client, etc.) is misconfigured to point to nimbus’ host and port? - Taylor On Mar 11, 2014, at 7:48 PM, Robert Lee <[email protected]> wrote: > I will downgrade to storm-0.9.0.1 and see if the error persists in that > version as well. > > > On Tue, Mar 11, 2014 at 7:47 PM, Robert Lee <[email protected]> wrote: > Yes -- more details: > > Storm version: 0.9.1-incubating installed using a variant of your > storm-vagrant deployment (https://github.com/ptgoetz/storm-vagrant). > > Cluster setup: two supervisor nodes with 1024m, nimbus with 1024m, zookeeper > (3.3.5) 512mb node, and a kafka (0.8.0) 512mb node. Persisting to a local > cassandra cluster. > > Here's an example topology I'm running. This topology works both in local and > distributed mode. A variant of this topology (more persisting and more > complicated functions on the kafka stream) works in local mode but gives the > thrift error reported above when submitting. > > public class SentenceAggregationTopology { > > private final BrokerHosts brokerHosts; > > public SentenceAggregationTopology(String kafkaZookeeper) { > brokerHosts = new ZkHosts(kafkaZookeeper); > } > > public StormTopology buildTopology() { > return buildTopology(null); > } > > public StormTopology buildTopology(LocalDRPC drpc) { > TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, > "storm-sentence", "storm"); > kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); > TransactionalTridentKafkaSpout kafkaSpout = new > TransactionalTridentKafkaSpout(kafkaConfig); > KafkaSentenceMapper mapper = new KafkaSentenceMapper("playlist", > "testtable", "word", "count"); > TridentTopology topology = new TridentTopology(); > > TridentState wordCounts = topology.newStream("kafka", > kafkaSpout).shuffle(). > each(new Fields("str"), new WordSplit(), new Fields("word")). > groupBy(new Fields("word")). > persistentAggregate( > CassandraBackingMap.nonTransactional(mapper), > new Count(), new Fields("aggregates_words")) > .parallelismHint(2); > > > topology.newDRPCStream("words", drpc) > .each(new Fields("args"), new Split(), new Fields("word")) > .groupBy(new Fields("word")) > .stateQuery(wordCounts, new Fields("word"), new MapGet(), new > Fields("count")) > .each(new Fields("count"), new FilterNull()) > .aggregate(new Fields("count"), new Sum(), new Fields("sum")); > > return topology.build(); > } > > public static void main(String[] args) throws Exception { > final int TIME_INTERVAL_IN_MILLIS = 1000; > > String kafkaZk = args[0]; > SentenceAggregationTopology sentenceAggregationTopology = new > SentenceAggregationTopology(kafkaZk); > > Config config = new Config(); > config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, > TIME_INTERVAL_IN_MILLIS); > config.put(Configuration.CASSANDRA_CQL_HOSTS_KEY, args[1]); > > if (args != null && args.length > 2) { > String name = args[2]; > config.setNumWorkers(4); > config.setMaxTaskParallelism(4); > StormSubmitter.submitTopology(name, config, > sentenceAggregationTopology.buildTopology()); > } else { > LocalDRPC drpc = new LocalDRPC(); > config.setNumWorkers(2); > config.setDebug(true); > config.setMaxTaskParallelism(2); > LocalCluster cluster = new LocalCluster(); > cluster.submitTopology("kafka", config, > sentenceAggregationTopology.buildTopology(drpc)); > while (true) { > System.out.println("Word count: " + drpc.execute("words", > "the")); > Utils.sleep(TIME_INTERVAL_IN_MILLIS); > } > > } > } > } > > > On Tue, Mar 11, 2014 at 7:33 PM, P. Taylor Goetz <[email protected]> wrote: > Hi Robert, > > Can you provide additional details, like what storm version you are using, > etc.? > > -Taylor > > > On Mar 11, 2014, at 6:57 PM, Robert Lee <[email protected]> wrote: > > > > > > > > After submitting my topology via the storm jar command: > > .... > > .... > > .... > > 562 [main] INFO backtype.storm.StormSubmitter - Uploading topology jar > > storm-kafka-cassandra-0.1.0-SNAPSHOT-jar-with-dependencies.jar to assigned > > location: > > storm-local/nimbus/inbox/stormjar-6926fc83-24d1-4b79-81c6-a3dec6dca9d9.jar > > 2307 [main] INFO backtype.storm.StormSubmitter - Successfully uploaded > > topology jar to assigned location: > > storm-local/nimbus/inbox/stormjar-6926fc83-24d1-4b79-81c6-a3dec6dca9d9.jar > > 2307 [main] INFO backtype.storm.StormSubmitter - Submitting topology test > > in distributed mode with conf > > {"topology.max.task.parallelism":4,"topology.workers":2,"topology.debug":true,"topology.trident.batch.emit.interval.millis":5000,} > > Exception in thread "main" java.lang.RuntimeException: > > org.apache.thrift7.transport.TTransportException: java.net.SocketException: > > Connection reset > > at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:95) > > at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:41) > > at > > com.sentaware.sentalytics.storm.trident.TwitterTopology.main(TwitterTopology.java:180) > > Caused by: org.apache.thrift7.transport.TTransportException: > > java.net.SocketException: Connection reset > > at > > org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:147) > > at > > org.apache.thrift7.transport.TFramedTransport.flush(TFramedTransport.java:157) > > at org.apache.thrift7.TServiceClient.sendBase(TServiceClient.java:65) > > at > > backtype.storm.generated.Nimbus$Client.send_submitTopology(Nimbus.java:139) > > at > > backtype.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:128) > > at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:81) > > ... 2 more > > Caused by: java.net.SocketException: Connection reset > > at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:118) > > at java.net.SocketOutputStream.write(SocketOutputStream.java:159) > > at > > org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:145) > > ... 7 more > > > > > > storm@nimbus:~$ tail /var/log/storm/nimbus.log > > 2014-03-11 22:38:31 b.s.d.nimbus [INFO] Uploading file from client to > > storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar > > 2014-03-11 22:38:31 b.s.d.nimbus [INFO] Uploading file from client to > > storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar > > 2014-03-11 22:38:32 b.s.d.nimbus [INFO] Finished uploading file from > > client: > > storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar > > 2014-03-11 22:38:32 b.s.d.nimbus [INFO] Finished uploading file from > > client: > > storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar > > 2014-03-11 22:38:32 o.a.t.s.TNonblockingServer [ERROR] Read a frame size of > > 2064605, which is bigger than the maximum allowable buffer size for ALL > > connections. > > > > Thoughts on how to proceed? I tried boosting memory from 256mb to 1024mb on > > the nimbus and supervisor nodes with no luck. The jar file is roughly 18MB > > in size and I can run another topology within the jar fine but the one I > > want to run (more complex) fails. > >
signature.asc
Description: Message signed with OpenPGP using GPGMail
