Yes -- more details:
Storm version: 0.9.1-incubating installed using a variant of your
storm-vagrant deployment (https://github.com/ptgoetz/storm-vagrant).
Cluster setup: two supervisor nodes with 1024m, nimbus with 1024m,
zookeeper (3.3.5) 512mb node, and a kafka (0.8.0) 512mb node. Persisting to
a local cassandra cluster.
Here's an example topology I'm running. This topology works both in local
and distributed mode. A variant of this topology (more persisting and more
complicated functions on the kafka stream) works in local mode but gives
the thrift error reported above when submitting.
public class SentenceAggregationTopology {
private final BrokerHosts brokerHosts;
public SentenceAggregationTopology(String kafkaZookeeper) {
brokerHosts = new ZkHosts(kafkaZookeeper);
}
public StormTopology buildTopology() {
return buildTopology(null);
}
public StormTopology buildTopology(LocalDRPC drpc) {
TridentKafkaConfig kafkaConfig = new
TridentKafkaConfig(brokerHosts, "storm-sentence", "storm");
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TransactionalTridentKafkaSpout kafkaSpout = new
TransactionalTridentKafkaSpout(kafkaConfig);
KafkaSentenceMapper mapper = new KafkaSentenceMapper("playlist",
"testtable", "word", "count");
TridentTopology topology = new TridentTopology();
TridentState wordCounts = topology.newStream("kafka",
kafkaSpout).shuffle().
each(new Fields("str"), new WordSplit(), new
Fields("word")).
groupBy(new Fields("word")).
persistentAggregate(
CassandraBackingMap.nonTransactional(mapper),
new Count(), new Fields("aggregates_words"))
.parallelismHint(2);
topology.newDRPCStream("words", drpc)
.each(new Fields("args"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.stateQuery(wordCounts, new Fields("word"), new MapGet(),
new Fields("count"))
.each(new Fields("count"), new FilterNull())
.aggregate(new Fields("count"), new Sum(), new
Fields("sum"));
return topology.build();
}
public static void main(String[] args) throws Exception {
final int TIME_INTERVAL_IN_MILLIS = 1000;
String kafkaZk = args[0];
SentenceAggregationTopology sentenceAggregationTopology = new
SentenceAggregationTopology(kafkaZk);
Config config = new Config();
config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS,
TIME_INTERVAL_IN_MILLIS);
config.put(Configuration.CASSANDRA_CQL_HOSTS_KEY, args[1]);
if (args != null && args.length > 2) {
String name = args[2];
config.setNumWorkers(4);
config.setMaxTaskParallelism(4);
StormSubmitter.submitTopology(name, config,
sentenceAggregationTopology.buildTopology());
} else {
LocalDRPC drpc = new LocalDRPC();
config.setNumWorkers(2);
config.setDebug(true);
config.setMaxTaskParallelism(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafka", config,
sentenceAggregationTopology.buildTopology(drpc));
while (true) {
System.out.println("Word count: " + drpc.execute("words",
"the"));
Utils.sleep(TIME_INTERVAL_IN_MILLIS);
}
}
}
}
On Tue, Mar 11, 2014 at 7:33 PM, P. Taylor Goetz <[email protected]> wrote:
> Hi Robert,
>
> Can you provide additional details, like what storm version you are using,
> etc.?
>
> -Taylor
>
> > On Mar 11, 2014, at 6:57 PM, Robert Lee <[email protected]> wrote:
> >
> >
> >
> > After submitting my topology via the storm jar command:
> > ....
> > ....
> > ....
> > 562 [main] INFO backtype.storm.StormSubmitter - Uploading topology jar
> storm-kafka-cassandra-0.1.0-SNAPSHOT-jar-with-dependencies.jar to assigned
> location:
> storm-local/nimbus/inbox/stormjar-6926fc83-24d1-4b79-81c6-a3dec6dca9d9.jar
> > 2307 [main] INFO backtype.storm.StormSubmitter - Successfully uploaded
> topology jar to assigned location:
> storm-local/nimbus/inbox/stormjar-6926fc83-24d1-4b79-81c6-a3dec6dca9d9.jar
> > 2307 [main] INFO backtype.storm.StormSubmitter - Submitting topology
> test in distributed mode with conf
> {"topology.max.task.parallelism":4,"topology.workers":2,"topology.debug":true,"topology.trident.batch.emit.interval.millis":5000,}
> > Exception in thread "main" java.lang.RuntimeException:
> org.apache.thrift7.transport.TTransportException: java.net.SocketException:
> Connection reset
> > at
> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:95)
> > at
> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:41)
> > at
> com.sentaware.sentalytics.storm.trident.TwitterTopology.main(TwitterTopology.java:180)
> > Caused by: org.apache.thrift7.transport.TTransportException:
> java.net.SocketException: Connection reset
> > at
> org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:147)
> > at
> org.apache.thrift7.transport.TFramedTransport.flush(TFramedTransport.java:157)
> > at org.apache.thrift7.TServiceClient.sendBase(TServiceClient.java:65)
> > at
> backtype.storm.generated.Nimbus$Client.send_submitTopology(Nimbus.java:139)
> > at
> backtype.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:128)
> > at
> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:81)
> > ... 2 more
> > Caused by: java.net.SocketException: Connection reset
> > at
> java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:118)
> > at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
> > at
> org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:145)
> > ... 7 more
> >
> >
> > storm@nimbus:~$ tail /var/log/storm/nimbus.log
> > 2014-03-11 22:38:31 b.s.d.nimbus [INFO] Uploading file from client to
> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar
> > 2014-03-11 22:38:31 b.s.d.nimbus [INFO] Uploading file from client to
> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar
> > 2014-03-11 22:38:32 b.s.d.nimbus [INFO] Finished uploading file from
> client:
> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar
> > 2014-03-11 22:38:32 b.s.d.nimbus [INFO] Finished uploading file from
> client:
> storm-local/nimbus/inbox/stormjar-e7bc17a5-3bf8-4bd1-8860-385ef105f827.jar
> > 2014-03-11 22:38:32 o.a.t.s.TNonblockingServer [ERROR] Read a frame size
> of 2064605, which is bigger than the maximum allowable buffer size for ALL
> connections.
> >
> > Thoughts on how to proceed? I tried boosting memory from 256mb to 1024mb
> on the nimbus and supervisor nodes with no luck. The jar file is roughly
> 18MB in size and I can run another topology within the jar fine but the one
> I want to run (more complex) fails.
>