Downgrading to storm-0.9.0.1 remedied the issue for me. I no longer receive
any exceptions. I'm fairly certain nothing is listening to the thrift port
besides the storm jar command I issue. The only thing I can think of is I
am connected to the nimbus node to view the logs in real time. When I have
finished other things on my plate, I'll revisit this issue to try to find a
cause.


On Wed, Mar 12, 2014 at 2:56 PM, P. Taylor Goetz <[email protected]> wrote:

> Hi Robert, let me know if you experience the same issue with 0.9.0.1. One
> thing that caught my eye was this:
>
> 2014-03-11 22:38:32 o.a.t.s.TNonblockingServer [ERROR] Read a frame size
> of 2064605, which is bigger than the maximum allowable buffer size for ALL
> connections.
>
> In 0.9.1 that can indicate that something other than the thrift client
> (like ssh, telnet, or a security scanner) is accessing nimbus' thrift port.
> More details here (https://github.com/apache/incubator-storm/pull/3).
>
> The fact that you can run a different topology class from the same jar
> file suggests that it is not a problem with the upload process.
>
> Is it possible that something in your topology (like a cassandra client,
> etc.) is misconfigured to point to nimbus' host and port?
>
> - Taylor
>
> On Mar 11, 2014, at 7:48 PM, Robert Lee <[email protected]> wrote:
>
> I will downgrade to storm-0.9.0.1 and see if the error persists in that
> version as well.
>
>
> On Tue, Mar 11, 2014 at 7:47 PM, Robert Lee <[email protected]>wrote:
>
>> Yes -- more details:
>>
>> Storm version: 0.9.1-incubating installed using a variant of your
>> storm-vagrant deployment (https://github.com/ptgoetz/storm-vagrant).
>>
>> Cluster setup: two supervisor nodes with 1024m, nimbus with 1024m,
>> zookeeper (3.3.5) 512mb node, and a kafka (0.8.0) 512mb node. Persisting to
>> a local cassandra cluster.
>>
>> Here's an example topology I'm running. This topology works both in local
>> and distributed mode. A variant of this topology (more persisting and more
>> complicated functions on the kafka stream) works in local mode but gives
>> the thrift error reported above when submitting.
>>
>> public class SentenceAggregationTopology {
>>
>>     private final BrokerHosts brokerHosts;
>>
>>     public SentenceAggregationTopology(String kafkaZookeeper) {
>>         brokerHosts = new ZkHosts(kafkaZookeeper);
>>     }
>>
>>     public StormTopology buildTopology() {
>>         return buildTopology(null);
>>     }
>>
>>     public StormTopology buildTopology(LocalDRPC drpc) {
>>         TridentKafkaConfig kafkaConfig = new
>> TridentKafkaConfig(brokerHosts, "storm-sentence", "storm");
>>         kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>>         TransactionalTridentKafkaSpout kafkaSpout = new
>> TransactionalTridentKafkaSpout(kafkaConfig);
>>         KafkaSentenceMapper mapper = new KafkaSentenceMapper("playlist",
>> "testtable", "word", "count");
>>         TridentTopology topology = new TridentTopology();
>>
>>         TridentState wordCounts = topology.newStream("kafka",
>> kafkaSpout).shuffle().
>>                 each(new Fields("str"), new WordSplit(), new
>> Fields("word")).
>>                 groupBy(new Fields("word")).
>>                 persistentAggregate(
>> CassandraBackingMap.nonTransactional(mapper),
>>                         new Count(), new Fields("aggregates_words"))
>>                 .parallelismHint(2);
>>
>>
>>         topology.newDRPCStream("words", drpc)
>>                 .each(new Fields("args"), new Split(), new Fields("word"))
>>                 .groupBy(new Fields("word"))
>>                 .stateQuery(wordCounts, new Fields("word"), new MapGet(),
>> new Fields("count"))
>>                 .each(new Fields("count"), new FilterNull())
>>                 .aggregate(new Fields("count"), new Sum(), new
>> Fields("sum"));
>>
>>         return topology.build();
>>     }
>>
>>     public static void main(String[] args) throws Exception {
>>         final int TIME_INTERVAL_IN_MILLIS = 1000;
>>
>>         String kafkaZk = args[0];
>>         SentenceAggregationTopology sentenceAggregationTopology = new
>> SentenceAggregationTopology(kafkaZk);
>>
>>         Config config = new Config();
>>         config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS,
>> TIME_INTERVAL_IN_MILLIS);
>>         config.put(Configuration.CASSANDRA_CQL_HOSTS_KEY, args[1]);
>>
>>         if (args != null && args.length > 2) {
>>             String name = args[2];
>>             config.setNumWorkers(4);
>>             config.setMaxTaskParallelism(4);
>>             StormSubmitter.submitTopology(name, config,
>> sentenceAggregationTopology.buildTopology());
>>         } else {
>>             LocalDRPC drpc = new LocalDRPC();
>>             config.setNumWorkers(2);
>>             config.setDebug(true);
>>             config.setMaxTaskParallelism(2);
>>             LocalCluster cluster = new LocalCluster();
>>             cluster.submitTopology("kafka", config,
>> sentenceAggregationTopology.buildTopology(drpc));
>>             while (true) {
>>                 System.out.println("Word count: " + drpc.execute("words",
>> "the"));
>>                 Utils.sleep(TIME_INTERVAL_IN_MILLIS);
>>             }
>>
>>         }
>>     }
>> }
>>
>>
>> On Tue, Mar 11, 2014 at 7:33 PM, P. Taylor Goetz <[email protected]>wrote:
>>
>>> Hi Robert,
>>>
>>> Can you provide additional details, like what storm version you are
>>> using, etc.?
>>>
>>> -Taylor
>>>
>>> > On Mar 11, 2014, at 6:57 PM, Robert Lee <[email protected]>
>>> wrote:
>>> >
>>> >
>>> >
>>> > After submitting my topology via the storm jar command:
>>> > ....
>>> > ....
>>> > ....
>>> > 562  [main] INFO  backtype.storm.StormSubmitter - Uploading topology
>>> jar storm-kafka-cassandra-0.1.0-SNAPSHOT-jar-with-dependencies.jar to
>>> assigned location:
>>> storm-local/nimbus/inbox/stormjar-6926fc83-24d1-4b79-81c6-a3dec6dca9d9.jar
>>> > 2307 [main] INFO  backtype.storm.StormSubmitter - Successfully
>>> uploaded topology jar to assigned location:
>>> storm-local/nimbus/inbox/stormjar-6926fc83-24d1-4b79-81c6-a3dec6dca9d9.jar
>>> > 2307 [main] INFO  backtype.storm.StormSubmitter - Submitting topology
>>> test in distributed mode with conf
>>> {"topology.max.task.parallelism":4,"topology.workers":2,"topology.debug":true,"topology.trident.batch.emit.interval.millis":5000,}
>>> > Exception in thread "main" java.lang.RuntimeException:
>>> org.apache.thrift7.transport.TTransportException: java.net.SocketException:
>>> Connection reset
>>> >     at
>>> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:95)
>>> >     at
>>> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:41)
>>> >     at
>>> com.sentaware.sentalytics.storm.trident.TwitterTopology.main(TwitterTopology.java:180)
>>> > Caused by: org.apache.thrift7.transport.TTransportException:
>>> java.net.SocketException: Connection reset
>>> >     at
>>> org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:147)
>>> >     at
>>> org.apache.thrift7.transport.TFramedTransport.flush(TFramedTransport.java:157)
>>> >     at
>>> org.apache.thrift7.TServiceClient.sendBase(TServiceClient.java:65)
>>> >     at
>>> backtype.storm.generated.Nimbus$Client.send_submitTopology(Nimbus.java:139)
>>> >     at
>>> backtype.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:128)
>>> >     at
>>> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:81)
>>> >     ... 2 more
>>> > Caused by: java.net.SocketException: Connection reset
>>> >     at
>>> java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:118)
>>> >     at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
>>> >     at
>>> org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:145)
>>> >     ... 7 more
>>> >
>>> >
>>> > storm@nimbus:~$ tail /var/log/storm/nimbus.log
>>> > 2014-03-11 22:38:31 b.s.d.nimbus [INFO] Uploading file from client to
>>> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar
>>> > 2014-03-11 22:38:31 b.s.d.nimbus [INFO] Uploading file from client to
>>> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar
>>> > 2014-03-11 22:38:32 b.s.d.nimbus [INFO] Finished uploading file from
>>> client:
>>> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar
>>> > 2014-03-11 22:38:32 b.s.d.nimbus [INFO] Finished uploading file from
>>> client:
>>> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar
>>> > 2014-03-11 22:38:32 o.a.t.s.TNonblockingServer [ERROR] Read a frame
>>> size of 2064605, which is bigger than the maximum allowable buffer size for
>>> ALL connections.
>>> >
>>> > Thoughts on how to proceed? I tried boosting memory from 256mb to
>>> 1024mb on the nimbus and supervisor nodes with no luck. The jar file is
>>> roughly 18MB in size and I can run another topology within the jar fine but
>>> the one I want to run (more complex) fails.
>>>
>>
>>
>
>

Reply via email to