I will downgrade to storm-0.9.0.1 and see if the error persists in that
version as well.


On Tue, Mar 11, 2014 at 7:47 PM, Robert Lee <[email protected]> wrote:

> Yes -- more details:
>
> Storm version: 0.9.1-incubating installed using a variant of your
> storm-vagrant deployment (https://github.com/ptgoetz/storm-vagrant).
>
> Cluster setup: two supervisor nodes with 1024m, nimbus with 1024m,
> zookeeper (3.3.5) 512mb node, and a kafka (0.8.0) 512mb node. Persisting to
> a local cassandra cluster.
>
> Here's an example topology I'm running. This topology works both in local
> and distributed mode. A variant of this topology (more persisting and more
> complicated functions on the kafka stream) works in local mode but gives
> the thrift error reported above when submitting.
>
> public class SentenceAggregationTopology {
>
>     private final BrokerHosts brokerHosts;
>
>     public SentenceAggregationTopology(String kafkaZookeeper) {
>         brokerHosts = new ZkHosts(kafkaZookeeper);
>     }
>
>     public StormTopology buildTopology() {
>         return buildTopology(null);
>     }
>
>     public StormTopology buildTopology(LocalDRPC drpc) {
>         TridentKafkaConfig kafkaConfig = new
> TridentKafkaConfig(brokerHosts, "storm-sentence", "storm");
>         kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>         TransactionalTridentKafkaSpout kafkaSpout = new
> TransactionalTridentKafkaSpout(kafkaConfig);
>         KafkaSentenceMapper mapper = new KafkaSentenceMapper("playlist",
> "testtable", "word", "count");
>         TridentTopology topology = new TridentTopology();
>
>         TridentState wordCounts = topology.newStream("kafka",
> kafkaSpout).shuffle().
>                 each(new Fields("str"), new WordSplit(), new
> Fields("word")).
>                 groupBy(new Fields("word")).
>                 persistentAggregate(
> CassandraBackingMap.nonTransactional(mapper),
>                         new Count(), new Fields("aggregates_words"))
>                 .parallelismHint(2);
>
>
>         topology.newDRPCStream("words", drpc)
>                 .each(new Fields("args"), new Split(), new Fields("word"))
>                 .groupBy(new Fields("word"))
>                 .stateQuery(wordCounts, new Fields("word"), new MapGet(),
> new Fields("count"))
>                 .each(new Fields("count"), new FilterNull())
>                 .aggregate(new Fields("count"), new Sum(), new
> Fields("sum"));
>
>         return topology.build();
>     }
>
>     public static void main(String[] args) throws Exception {
>         final int TIME_INTERVAL_IN_MILLIS = 1000;
>
>         String kafkaZk = args[0];
>         SentenceAggregationTopology sentenceAggregationTopology = new
> SentenceAggregationTopology(kafkaZk);
>
>         Config config = new Config();
>         config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS,
> TIME_INTERVAL_IN_MILLIS);
>         config.put(Configuration.CASSANDRA_CQL_HOSTS_KEY, args[1]);
>
>         if (args != null && args.length > 2) {
>             String name = args[2];
>             config.setNumWorkers(4);
>             config.setMaxTaskParallelism(4);
>             StormSubmitter.submitTopology(name, config,
> sentenceAggregationTopology.buildTopology());
>         } else {
>             LocalDRPC drpc = new LocalDRPC();
>             config.setNumWorkers(2);
>             config.setDebug(true);
>             config.setMaxTaskParallelism(2);
>             LocalCluster cluster = new LocalCluster();
>             cluster.submitTopology("kafka", config,
> sentenceAggregationTopology.buildTopology(drpc));
>             while (true) {
>                 System.out.println("Word count: " + drpc.execute("words",
> "the"));
>                 Utils.sleep(TIME_INTERVAL_IN_MILLIS);
>             }
>
>         }
>     }
> }
>
>
> On Tue, Mar 11, 2014 at 7:33 PM, P. Taylor Goetz <[email protected]>wrote:
>
>> Hi Robert,
>>
>> Can you provide additional details, like what storm version you are
>> using, etc.?
>>
>> -Taylor
>>
>> > On Mar 11, 2014, at 6:57 PM, Robert Lee <[email protected]>
>> wrote:
>> >
>> >
>> >
>> > After submitting my topology via the storm jar command:
>> > ....
>> > ....
>> > ....
>> > 562  [main] INFO  backtype.storm.StormSubmitter - Uploading topology
>> jar storm-kafka-cassandra-0.1.0-SNAPSHOT-jar-with-dependencies.jar to
>> assigned location:
>> storm-local/nimbus/inbox/stormjar-6926fc83-24d1-4b79-81c6-a3dec6dca9d9.jar
>> > 2307 [main] INFO  backtype.storm.StormSubmitter - Successfully uploaded
>> topology jar to assigned location:
>> storm-local/nimbus/inbox/stormjar-6926fc83-24d1-4b79-81c6-a3dec6dca9d9.jar
>> > 2307 [main] INFO  backtype.storm.StormSubmitter - Submitting topology
>> test in distributed mode with conf
>> {"topology.max.task.parallelism":4,"topology.workers":2,"topology.debug":true,"topology.trident.batch.emit.interval.millis":5000,}
>> > Exception in thread "main" java.lang.RuntimeException:
>> org.apache.thrift7.transport.TTransportException: java.net.SocketException:
>> Connection reset
>> >     at
>> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:95)
>> >     at
>> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:41)
>> >     at
>> com.sentaware.sentalytics.storm.trident.TwitterTopology.main(TwitterTopology.java:180)
>> > Caused by: org.apache.thrift7.transport.TTransportException:
>> java.net.SocketException: Connection reset
>> >     at
>> org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:147)
>> >     at
>> org.apache.thrift7.transport.TFramedTransport.flush(TFramedTransport.java:157)
>> >     at
>> org.apache.thrift7.TServiceClient.sendBase(TServiceClient.java:65)
>> >     at
>> backtype.storm.generated.Nimbus$Client.send_submitTopology(Nimbus.java:139)
>> >     at
>> backtype.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:128)
>> >     at
>> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:81)
>> >     ... 2 more
>> > Caused by: java.net.SocketException: Connection reset
>> >     at
>> java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:118)
>> >     at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
>> >     at
>> org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:145)
>> >     ... 7 more
>> >
>> >
>> > storm@nimbus:~$ tail /var/log/storm/nimbus.log
>> > 2014-03-11 22:38:31 b.s.d.nimbus [INFO] Uploading file from client to
>> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar
>> > 2014-03-11 22:38:31 b.s.d.nimbus [INFO] Uploading file from client to
>> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar
>> > 2014-03-11 22:38:32 b.s.d.nimbus [INFO] Finished uploading file from
>> client:
>> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar
>> > 2014-03-11 22:38:32 b.s.d.nimbus [INFO] Finished uploading file from
>> client:
>> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar
>> > 2014-03-11 22:38:32 o.a.t.s.TNonblockingServer [ERROR] Read a frame
>> size of 2064605, which is bigger than the maximum allowable buffer size for
>> ALL connections.
>> >
>> > Thoughts on how to proceed? I tried boosting memory from 256mb to
>> 1024mb on the nimbus and supervisor nodes with no luck. The jar file is
>> roughly 18MB in size and I can run another topology within the jar fine but
>> the one I want to run (more complex) fails.
>>
>
>

Reply via email to