The storm-cassandra project (https://github.com/hmsonline/storm-cassandra) is actively maintained (DISCLAIMER: I’m the original author), so I would give that one a try. It’s also aligned with the latest version of storm.
Also, which version of storm are you using? - Taylor On Feb 3, 2014, at 5:34 PM, Adrian Mocanu <[email protected]> wrote: > It’s from trident.cassandra > libraryDependencies += "trident-cassandra" %"trident-cassandra" % "0.0.1-wip2" > > but I modified the file and removed an IMetrics parameter (it seems I have 2 > interfaces for it in my code so during runtime it would think it was the > other interface which did not have that param and crashed) Anyhow, I saw that > there is another version of CassandraState in hmsonline project > (libraryDependencies += "com.hmsonline" % "storm-cassandra" % "0.4.0-rc4") > which is very different. I did not use that one – I thought that was the > older version. > > Thanks > A > From: P. Taylor Goetz [mailto:[email protected]] > Sent: February-03-14 5:21 PM > To: [email protected] > Subject: Re: trident simplest aggregator into cassandra > > Which project did the CassandraState implementation come from? > > > On Feb 3, 2014, at 5:09 PM, Adrian Mocanu <[email protected]> wrote: > > > Hi > I'm using Trident to perform some aggregations and store the results into > cassandra. > > I've looked at IBackingMap and specifically at some tutorials on trident site > and I've tried using CassandraState which I found online in some repository. > After creating what I thought were column family and keys corresponding to > the code I still cannot figure out how to run the sample topology and not > crash due to some Cassandra schema error (InvalidRequestException(why:Invalid > cell for CQL3 table state. The CQL3 column component (over) does not > correspond to a defined CQL3 column). > > Here is the sample code I use: > > val cassandraStateFactory:StateFactory = > chat.CassandraState.transactional("10.10.6.80") > > val spout = new FixedBatchSpout(new Fields("sentence"), 3, > new Values("the cow jumped over the moon"), > new Values("the man went to the store and bought some candy"), > new Values("four score and seven years ago"), > new Values("how many apples can you eat")) > spout.setCycle(true) > > val wordCounts :TridentState= tridentBuilder.newStream("spout1", spout) > .each(new Fields("sentence"), new Split(), new Fields("word")) > .groupBy(new Fields("word")) > .persistentAggregate(cassandraStateFactory, new Count(), new > Fields("count")) > .parallelismHint(6) > > val cluster = new LocalCluster(); > val config = new Config(); > config.setMaxSpoutPending(100); > config.setMaxSpoutPending(25); > cluster.submitTopology("test", config, tridentBuilder.build()); > > > What is the schema needed to run this example (it also uses CassandraState)? > > thanks > A
signature.asc
Description: Message signed with OpenPGP using GPGMail
