Downgrading to storm-0.9.0.1 remedied the issue for me. I no longer receive any exceptions. I'm fairly certain nothing is listening to the thrift port besides the storm jar command I issue. The only thing I can think of is I am connected to the nimbus node to view the logs in real time. When I have finished other things on my plate, I'll revisit this issue to try to find a cause.
On Wed, Mar 12, 2014 at 2:56 PM, P. Taylor Goetz <[email protected]> wrote: > Hi Robert, let me know if you experience the same issue with 0.9.0.1. One > thing that caught my eye was this: > > 2014-03-11 22:38:32 o.a.t.s.TNonblockingServer [ERROR] Read a frame size > of 2064605, which is bigger than the maximum allowable buffer size for ALL > connections. > > In 0.9.1 that can indicate that something other than the thrift client > (like ssh, telnet, or a security scanner) is accessing nimbus' thrift port. > More details here (https://github.com/apache/incubator-storm/pull/3). > > The fact that you can run a different topology class from the same jar > file suggests that it is not a problem with the upload process. > > Is it possible that something in your topology (like a cassandra client, > etc.) is misconfigured to point to nimbus' host and port? > > - Taylor > > On Mar 11, 2014, at 7:48 PM, Robert Lee <[email protected]> wrote: > > I will downgrade to storm-0.9.0.1 and see if the error persists in that > version as well. > > > On Tue, Mar 11, 2014 at 7:47 PM, Robert Lee <[email protected]>wrote: > >> Yes -- more details: >> >> Storm version: 0.9.1-incubating installed using a variant of your >> storm-vagrant deployment (https://github.com/ptgoetz/storm-vagrant). >> >> Cluster setup: two supervisor nodes with 1024m, nimbus with 1024m, >> zookeeper (3.3.5) 512mb node, and a kafka (0.8.0) 512mb node. Persisting to >> a local cassandra cluster. >> >> Here's an example topology I'm running. This topology works both in local >> and distributed mode. A variant of this topology (more persisting and more >> complicated functions on the kafka stream) works in local mode but gives >> the thrift error reported above when submitting. >> >> public class SentenceAggregationTopology { >> >> private final BrokerHosts brokerHosts; >> >> public SentenceAggregationTopology(String kafkaZookeeper) { >> brokerHosts = new ZkHosts(kafkaZookeeper); >> } >> >> public StormTopology buildTopology() { >> return buildTopology(null); >> } >> >> public StormTopology buildTopology(LocalDRPC drpc) { >> TridentKafkaConfig kafkaConfig = new >> TridentKafkaConfig(brokerHosts, "storm-sentence", "storm"); >> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); >> TransactionalTridentKafkaSpout kafkaSpout = new >> TransactionalTridentKafkaSpout(kafkaConfig); >> KafkaSentenceMapper mapper = new KafkaSentenceMapper("playlist", >> "testtable", "word", "count"); >> TridentTopology topology = new TridentTopology(); >> >> TridentState wordCounts = topology.newStream("kafka", >> kafkaSpout).shuffle(). >> each(new Fields("str"), new WordSplit(), new >> Fields("word")). >> groupBy(new Fields("word")). >> persistentAggregate( >> CassandraBackingMap.nonTransactional(mapper), >> new Count(), new Fields("aggregates_words")) >> .parallelismHint(2); >> >> >> topology.newDRPCStream("words", drpc) >> .each(new Fields("args"), new Split(), new Fields("word")) >> .groupBy(new Fields("word")) >> .stateQuery(wordCounts, new Fields("word"), new MapGet(), >> new Fields("count")) >> .each(new Fields("count"), new FilterNull()) >> .aggregate(new Fields("count"), new Sum(), new >> Fields("sum")); >> >> return topology.build(); >> } >> >> public static void main(String[] args) throws Exception { >> final int TIME_INTERVAL_IN_MILLIS = 1000; >> >> String kafkaZk = args[0]; >> SentenceAggregationTopology sentenceAggregationTopology = new >> SentenceAggregationTopology(kafkaZk); >> >> Config config = new Config(); >> config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, >> TIME_INTERVAL_IN_MILLIS); >> config.put(Configuration.CASSANDRA_CQL_HOSTS_KEY, args[1]); >> >> if (args != null && args.length > 2) { >> String name = args[2]; >> config.setNumWorkers(4); >> config.setMaxTaskParallelism(4); >> StormSubmitter.submitTopology(name, config, >> sentenceAggregationTopology.buildTopology()); >> } else { >> LocalDRPC drpc = new LocalDRPC(); >> config.setNumWorkers(2); >> config.setDebug(true); >> config.setMaxTaskParallelism(2); >> LocalCluster cluster = new LocalCluster(); >> cluster.submitTopology("kafka", config, >> sentenceAggregationTopology.buildTopology(drpc)); >> while (true) { >> System.out.println("Word count: " + drpc.execute("words", >> "the")); >> Utils.sleep(TIME_INTERVAL_IN_MILLIS); >> } >> >> } >> } >> } >> >> >> On Tue, Mar 11, 2014 at 7:33 PM, P. Taylor Goetz <[email protected]>wrote: >> >>> Hi Robert, >>> >>> Can you provide additional details, like what storm version you are >>> using, etc.? >>> >>> -Taylor >>> >>> > On Mar 11, 2014, at 6:57 PM, Robert Lee <[email protected]> >>> wrote: >>> > >>> > >>> > >>> > After submitting my topology via the storm jar command: >>> > .... >>> > .... >>> > .... >>> > 562 [main] INFO backtype.storm.StormSubmitter - Uploading topology >>> jar storm-kafka-cassandra-0.1.0-SNAPSHOT-jar-with-dependencies.jar to >>> assigned location: >>> storm-local/nimbus/inbox/stormjar-6926fc83-24d1-4b79-81c6-a3dec6dca9d9.jar >>> > 2307 [main] INFO backtype.storm.StormSubmitter - Successfully >>> uploaded topology jar to assigned location: >>> storm-local/nimbus/inbox/stormjar-6926fc83-24d1-4b79-81c6-a3dec6dca9d9.jar >>> > 2307 [main] INFO backtype.storm.StormSubmitter - Submitting topology >>> test in distributed mode with conf >>> {"topology.max.task.parallelism":4,"topology.workers":2,"topology.debug":true,"topology.trident.batch.emit.interval.millis":5000,} >>> > Exception in thread "main" java.lang.RuntimeException: >>> org.apache.thrift7.transport.TTransportException: java.net.SocketException: >>> Connection reset >>> > at >>> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:95) >>> > at >>> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:41) >>> > at >>> com.sentaware.sentalytics.storm.trident.TwitterTopology.main(TwitterTopology.java:180) >>> > Caused by: org.apache.thrift7.transport.TTransportException: >>> java.net.SocketException: Connection reset >>> > at >>> org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:147) >>> > at >>> org.apache.thrift7.transport.TFramedTransport.flush(TFramedTransport.java:157) >>> > at >>> org.apache.thrift7.TServiceClient.sendBase(TServiceClient.java:65) >>> > at >>> backtype.storm.generated.Nimbus$Client.send_submitTopology(Nimbus.java:139) >>> > at >>> backtype.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:128) >>> > at >>> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:81) >>> > ... 2 more >>> > Caused by: java.net.SocketException: Connection reset >>> > at >>> java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:118) >>> > at java.net.SocketOutputStream.write(SocketOutputStream.java:159) >>> > at >>> org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:145) >>> > ... 7 more >>> > >>> > >>> > storm@nimbus:~$ tail /var/log/storm/nimbus.log >>> > 2014-03-11 22:38:31 b.s.d.nimbus [INFO] Uploading file from client to >>> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar >>> > 2014-03-11 22:38:31 b.s.d.nimbus [INFO] Uploading file from client to >>> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar >>> > 2014-03-11 22:38:32 b.s.d.nimbus [INFO] Finished uploading file from >>> client: >>> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar >>> > 2014-03-11 22:38:32 b.s.d.nimbus [INFO] Finished uploading file from >>> client: >>> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar >>> > 2014-03-11 22:38:32 o.a.t.s.TNonblockingServer [ERROR] Read a frame >>> size of 2064605, which is bigger than the maximum allowable buffer size for >>> ALL connections. >>> > >>> > Thoughts on how to proceed? I tried boosting memory from 256mb to >>> 1024mb on the nimbus and supervisor nodes with no luck. The jar file is >>> roughly 18MB in size and I can run another topology within the jar fine but >>> the one I want to run (more complex) fails. >>> >> >> > >
