I will downgrade to storm-0.9.0.1 and see if the error persists in that version as well.
On Tue, Mar 11, 2014 at 7:47 PM, Robert Lee <[email protected]> wrote: > Yes -- more details: > > Storm version: 0.9.1-incubating installed using a variant of your > storm-vagrant deployment (https://github.com/ptgoetz/storm-vagrant). > > Cluster setup: two supervisor nodes with 1024m, nimbus with 1024m, > zookeeper (3.3.5) 512mb node, and a kafka (0.8.0) 512mb node. Persisting to > a local cassandra cluster. > > Here's an example topology I'm running. This topology works both in local > and distributed mode. A variant of this topology (more persisting and more > complicated functions on the kafka stream) works in local mode but gives > the thrift error reported above when submitting. > > public class SentenceAggregationTopology { > > private final BrokerHosts brokerHosts; > > public SentenceAggregationTopology(String kafkaZookeeper) { > brokerHosts = new ZkHosts(kafkaZookeeper); > } > > public StormTopology buildTopology() { > return buildTopology(null); > } > > public StormTopology buildTopology(LocalDRPC drpc) { > TridentKafkaConfig kafkaConfig = new > TridentKafkaConfig(brokerHosts, "storm-sentence", "storm"); > kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); > TransactionalTridentKafkaSpout kafkaSpout = new > TransactionalTridentKafkaSpout(kafkaConfig); > KafkaSentenceMapper mapper = new KafkaSentenceMapper("playlist", > "testtable", "word", "count"); > TridentTopology topology = new TridentTopology(); > > TridentState wordCounts = topology.newStream("kafka", > kafkaSpout).shuffle(). > each(new Fields("str"), new WordSplit(), new > Fields("word")). > groupBy(new Fields("word")). > persistentAggregate( > CassandraBackingMap.nonTransactional(mapper), > new Count(), new Fields("aggregates_words")) > .parallelismHint(2); > > > topology.newDRPCStream("words", drpc) > .each(new Fields("args"), new Split(), new Fields("word")) > .groupBy(new Fields("word")) > .stateQuery(wordCounts, new Fields("word"), new MapGet(), > new Fields("count")) > .each(new Fields("count"), new FilterNull()) > .aggregate(new Fields("count"), new Sum(), new > Fields("sum")); > > return topology.build(); > } > > public static void main(String[] args) throws Exception { > final int TIME_INTERVAL_IN_MILLIS = 1000; > > String kafkaZk = args[0]; > SentenceAggregationTopology sentenceAggregationTopology = new > SentenceAggregationTopology(kafkaZk); > > Config config = new Config(); > config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, > TIME_INTERVAL_IN_MILLIS); > config.put(Configuration.CASSANDRA_CQL_HOSTS_KEY, args[1]); > > if (args != null && args.length > 2) { > String name = args[2]; > config.setNumWorkers(4); > config.setMaxTaskParallelism(4); > StormSubmitter.submitTopology(name, config, > sentenceAggregationTopology.buildTopology()); > } else { > LocalDRPC drpc = new LocalDRPC(); > config.setNumWorkers(2); > config.setDebug(true); > config.setMaxTaskParallelism(2); > LocalCluster cluster = new LocalCluster(); > cluster.submitTopology("kafka", config, > sentenceAggregationTopology.buildTopology(drpc)); > while (true) { > System.out.println("Word count: " + drpc.execute("words", > "the")); > Utils.sleep(TIME_INTERVAL_IN_MILLIS); > } > > } > } > } > > > On Tue, Mar 11, 2014 at 7:33 PM, P. Taylor Goetz <[email protected]>wrote: > >> Hi Robert, >> >> Can you provide additional details, like what storm version you are >> using, etc.? >> >> -Taylor >> >> > On Mar 11, 2014, at 6:57 PM, Robert Lee <[email protected]> >> wrote: >> > >> > >> > >> > After submitting my topology via the storm jar command: >> > .... >> > .... >> > .... >> > 562 [main] INFO backtype.storm.StormSubmitter - Uploading topology >> jar storm-kafka-cassandra-0.1.0-SNAPSHOT-jar-with-dependencies.jar to >> assigned location: >> storm-local/nimbus/inbox/stormjar-6926fc83-24d1-4b79-81c6-a3dec6dca9d9.jar >> > 2307 [main] INFO backtype.storm.StormSubmitter - Successfully uploaded >> topology jar to assigned location: >> storm-local/nimbus/inbox/stormjar-6926fc83-24d1-4b79-81c6-a3dec6dca9d9.jar >> > 2307 [main] INFO backtype.storm.StormSubmitter - Submitting topology >> test in distributed mode with conf >> {"topology.max.task.parallelism":4,"topology.workers":2,"topology.debug":true,"topology.trident.batch.emit.interval.millis":5000,} >> > Exception in thread "main" java.lang.RuntimeException: >> org.apache.thrift7.transport.TTransportException: java.net.SocketException: >> Connection reset >> > at >> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:95) >> > at >> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:41) >> > at >> com.sentaware.sentalytics.storm.trident.TwitterTopology.main(TwitterTopology.java:180) >> > Caused by: org.apache.thrift7.transport.TTransportException: >> java.net.SocketException: Connection reset >> > at >> org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:147) >> > at >> org.apache.thrift7.transport.TFramedTransport.flush(TFramedTransport.java:157) >> > at >> org.apache.thrift7.TServiceClient.sendBase(TServiceClient.java:65) >> > at >> backtype.storm.generated.Nimbus$Client.send_submitTopology(Nimbus.java:139) >> > at >> backtype.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:128) >> > at >> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:81) >> > ... 2 more >> > Caused by: java.net.SocketException: Connection reset >> > at >> java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:118) >> > at java.net.SocketOutputStream.write(SocketOutputStream.java:159) >> > at >> org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:145) >> > ... 7 more >> > >> > >> > storm@nimbus:~$ tail /var/log/storm/nimbus.log >> > 2014-03-11 22:38:31 b.s.d.nimbus [INFO] Uploading file from client to >> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar >> > 2014-03-11 22:38:31 b.s.d.nimbus [INFO] Uploading file from client to >> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar >> > 2014-03-11 22:38:32 b.s.d.nimbus [INFO] Finished uploading file from >> client: >> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar >> > 2014-03-11 22:38:32 b.s.d.nimbus [INFO] Finished uploading file from >> client: >> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar >> > 2014-03-11 22:38:32 o.a.t.s.TNonblockingServer [ERROR] Read a frame >> size of 2064605, which is bigger than the maximum allowable buffer size for >> ALL connections. >> > >> > Thoughts on how to proceed? I tried boosting memory from 256mb to >> 1024mb on the nimbus and supervisor nodes with no luck. The jar file is >> roughly 18MB in size and I can run another topology within the jar fine but >> the one I want to run (more complex) fails. >> > >
