Robert-- I just saw this thread now. I had the same issue recently myself. Taylor posted the full conversation we had back to the list, but you may not have noticed it since it was several messages squashed into one.
In any case, I think you can fix this by increasing the max allowable buffer on Nimbus in storm.yaml using the config option "nimbus.thrift.max_buffer_size" (it is specified in bytes). The default is something like 1MB, I upped mine to 32MB to accommodate a 6MB topology. It looks like your topology is only about 2MB so you might not need to go that extreme. I think the only downside of increasing this is that you lose the protection it was designed to provide: namely a malicious/buggy client connecting to Nimbus can exhaust Nimbus memory by sending really big thrift messages. On Thu, Mar 13, 2014 at 11:44 AM, Robert Lee <[email protected]>wrote: > Downgrading to storm-0.9.0.1 remedied the issue for me. I no longer > receive any exceptions. I'm fairly certain nothing is listening to the > thrift port besides the storm jar command I issue. The only thing I can > think of is I am connected to the nimbus node to view the logs in real > time. When I have finished other things on my plate, I'll revisit this > issue to try to find a cause. > > > On Wed, Mar 12, 2014 at 2:56 PM, P. Taylor Goetz <[email protected]>wrote: > >> Hi Robert, let me know if you experience the same issue with 0.9.0.1. One >> thing that caught my eye was this: >> >> 2014-03-11 22:38:32 o.a.t.s.TNonblockingServer [ERROR] Read a frame size >> of 2064605, which is bigger than the maximum allowable buffer size for ALL >> connections. >> >> In 0.9.1 that can indicate that something other than the thrift client >> (like ssh, telnet, or a security scanner) is accessing nimbus' thrift port. >> More details here (https://github.com/apache/incubator-storm/pull/3). >> >> The fact that you can run a different topology class from the same jar >> file suggests that it is not a problem with the upload process. >> >> Is it possible that something in your topology (like a cassandra client, >> etc.) is misconfigured to point to nimbus' host and port? >> >> - Taylor >> >> On Mar 11, 2014, at 7:48 PM, Robert Lee <[email protected]> wrote: >> >> I will downgrade to storm-0.9.0.1 and see if the error persists in that >> version as well. >> >> >> On Tue, Mar 11, 2014 at 7:47 PM, Robert Lee <[email protected]>wrote: >> >>> Yes -- more details: >>> >>> Storm version: 0.9.1-incubating installed using a variant of your >>> storm-vagrant deployment (https://github.com/ptgoetz/storm-vagrant). >>> >>> Cluster setup: two supervisor nodes with 1024m, nimbus with 1024m, >>> zookeeper (3.3.5) 512mb node, and a kafka (0.8.0) 512mb node. Persisting to >>> a local cassandra cluster. >>> >>> Here's an example topology I'm running. This topology works both in >>> local and distributed mode. A variant of this topology (more persisting and >>> more complicated functions on the kafka stream) works in local mode but >>> gives the thrift error reported above when submitting. >>> >>> public class SentenceAggregationTopology { >>> >>> private final BrokerHosts brokerHosts; >>> >>> public SentenceAggregationTopology(String kafkaZookeeper) { >>> brokerHosts = new ZkHosts(kafkaZookeeper); >>> } >>> >>> public StormTopology buildTopology() { >>> return buildTopology(null); >>> } >>> >>> public StormTopology buildTopology(LocalDRPC drpc) { >>> TridentKafkaConfig kafkaConfig = new >>> TridentKafkaConfig(brokerHosts, "storm-sentence", "storm"); >>> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); >>> TransactionalTridentKafkaSpout kafkaSpout = new >>> TransactionalTridentKafkaSpout(kafkaConfig); >>> KafkaSentenceMapper mapper = new KafkaSentenceMapper("playlist", >>> "testtable", "word", "count"); >>> TridentTopology topology = new TridentTopology(); >>> >>> TridentState wordCounts = topology.newStream("kafka", >>> kafkaSpout).shuffle(). >>> each(new Fields("str"), new WordSplit(), new >>> Fields("word")). >>> groupBy(new Fields("word")). >>> persistentAggregate( >>> CassandraBackingMap.nonTransactional(mapper), >>> new Count(), new Fields("aggregates_words")) >>> .parallelismHint(2); >>> >>> >>> topology.newDRPCStream("words", drpc) >>> .each(new Fields("args"), new Split(), new >>> Fields("word")) >>> .groupBy(new Fields("word")) >>> .stateQuery(wordCounts, new Fields("word"), new >>> MapGet(), new Fields("count")) >>> .each(new Fields("count"), new FilterNull()) >>> .aggregate(new Fields("count"), new Sum(), new >>> Fields("sum")); >>> >>> return topology.build(); >>> } >>> >>> public static void main(String[] args) throws Exception { >>> final int TIME_INTERVAL_IN_MILLIS = 1000; >>> >>> String kafkaZk = args[0]; >>> SentenceAggregationTopology sentenceAggregationTopology = new >>> SentenceAggregationTopology(kafkaZk); >>> >>> Config config = new Config(); >>> config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, >>> TIME_INTERVAL_IN_MILLIS); >>> config.put(Configuration.CASSANDRA_CQL_HOSTS_KEY, args[1]); >>> >>> if (args != null && args.length > 2) { >>> String name = args[2]; >>> config.setNumWorkers(4); >>> config.setMaxTaskParallelism(4); >>> StormSubmitter.submitTopology(name, config, >>> sentenceAggregationTopology.buildTopology()); >>> } else { >>> LocalDRPC drpc = new LocalDRPC(); >>> config.setNumWorkers(2); >>> config.setDebug(true); >>> config.setMaxTaskParallelism(2); >>> LocalCluster cluster = new LocalCluster(); >>> cluster.submitTopology("kafka", config, >>> sentenceAggregationTopology.buildTopology(drpc)); >>> while (true) { >>> System.out.println("Word count: " + >>> drpc.execute("words", "the")); >>> Utils.sleep(TIME_INTERVAL_IN_MILLIS); >>> } >>> >>> } >>> } >>> } >>> >>> >>> On Tue, Mar 11, 2014 at 7:33 PM, P. Taylor Goetz <[email protected]>wrote: >>> >>>> Hi Robert, >>>> >>>> Can you provide additional details, like what storm version you are >>>> using, etc.? >>>> >>>> -Taylor >>>> >>>> > On Mar 11, 2014, at 6:57 PM, Robert Lee <[email protected]> >>>> wrote: >>>> > >>>> > >>>> > >>>> > After submitting my topology via the storm jar command: >>>> > .... >>>> > .... >>>> > .... >>>> > 562 [main] INFO backtype.storm.StormSubmitter - Uploading topology >>>> jar storm-kafka-cassandra-0.1.0-SNAPSHOT-jar-with-dependencies.jar to >>>> assigned location: >>>> storm-local/nimbus/inbox/stormjar-6926fc83-24d1-4b79-81c6-a3dec6dca9d9.jar >>>> > 2307 [main] INFO backtype.storm.StormSubmitter - Successfully >>>> uploaded topology jar to assigned location: >>>> storm-local/nimbus/inbox/stormjar-6926fc83-24d1-4b79-81c6-a3dec6dca9d9.jar >>>> > 2307 [main] INFO backtype.storm.StormSubmitter - Submitting topology >>>> test in distributed mode with conf >>>> {"topology.max.task.parallelism":4,"topology.workers":2,"topology.debug":true,"topology.trident.batch.emit.interval.millis":5000,} >>>> > Exception in thread "main" java.lang.RuntimeException: >>>> org.apache.thrift7.transport.TTransportException: java.net.SocketException: >>>> Connection reset >>>> > at >>>> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:95) >>>> > at >>>> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:41) >>>> > at >>>> com.sentaware.sentalytics.storm.trident.TwitterTopology.main(TwitterTopology.java:180) >>>> > Caused by: org.apache.thrift7.transport.TTransportException: >>>> java.net.SocketException: Connection reset >>>> > at >>>> org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:147) >>>> > at >>>> org.apache.thrift7.transport.TFramedTransport.flush(TFramedTransport.java:157) >>>> > at >>>> org.apache.thrift7.TServiceClient.sendBase(TServiceClient.java:65) >>>> > at >>>> backtype.storm.generated.Nimbus$Client.send_submitTopology(Nimbus.java:139) >>>> > at >>>> backtype.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:128) >>>> > at >>>> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:81) >>>> > ... 2 more >>>> > Caused by: java.net.SocketException: Connection reset >>>> > at >>>> java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:118) >>>> > at java.net.SocketOutputStream.write(SocketOutputStream.java:159) >>>> > at >>>> org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:145) >>>> > ... 7 more >>>> > >>>> > >>>> > storm@nimbus:~$ tail /var/log/storm/nimbus.log >>>> > 2014-03-11 22:38:31 b.s.d.nimbus [INFO] Uploading file from client to >>>> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar >>>> > 2014-03-11 22:38:31 b.s.d.nimbus [INFO] Uploading file from client to >>>> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar >>>> > 2014-03-11 22:38:32 b.s.d.nimbus [INFO] Finished uploading file from >>>> client: >>>> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar >>>> > 2014-03-11 22:38:32 b.s.d.nimbus [INFO] Finished uploading file from >>>> client: >>>> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar >>>> > 2014-03-11 22:38:32 o.a.t.s.TNonblockingServer [ERROR] Read a frame >>>> size of 2064605, which is bigger than the maximum allowable buffer size for >>>> ALL connections. >>>> > >>>> > Thoughts on how to proceed? I tried boosting memory from 256mb to >>>> 1024mb on the nimbus and supervisor nodes with no luck. The jar file is >>>> roughly 18MB in size and I can run another topology within the jar fine but >>>> the one I want to run (more complex) fails. >>>> >>> >>> >> >> >
