This might be useful to you as well:
Netty Issue (I'm using Storm-Kafka and Kafka as well):
rm /opt/programs/storm/lib/netty-3.2.2.Final.jar
CassandraWriterBolt Topology implementation:
CassandraWriterBolt cassandraWriterImpressionsBolt = new
CassandraWriterBolt(
properties.getProperty("cassandra.host"),
properties.getProperty("cassandra.host.keyspace"),
properties.getProperty("cassandra.cql.writer.impressions")
);
CassandraWriterBolt.java
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import com.datastax.driver.core.*;
import com.datastax.driver.core.policies.*;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* CassandraWriterBolt.java
* CQL Prepared Statement Writer to Cassandra Cluster as Storm Bolt.
*
* @author Dan DeCapria, Copyright (c) 2011-2014
* @since 19 August 2014
* @version 0.1, 26 August 2014
*/
public class CassandraWriterBolt extends BaseRichBolt {
private OutputCollector collector;
private static final Logger logger =
LoggerFactory.getLogger(CassandraWriterBolt.class);
private TopologyContext context;
private String node;
private String key_space;
private String cql;
private Cluster cluster;
private Session session;
private PreparedStatement prepared_statement;
public CassandraWriterBolt(String node, String key_space, String cql) {
this.node = node;
this.key_space = key_space;
this.cql = cql;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// we don't emit anything from here
}
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.context = context;
this.collector = collector;
this.cluster = Cluster.builder()
.withoutJMXReporting()
.withoutMetrics()
.addContactPoint(this.node)
.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
.withReconnectionPolicy(new
ExponentialReconnectionPolicy(100L, 5000L))
.withLoadBalancingPolicy(new TokenAwarePolicy(new
RoundRobinPolicy()))
.build();
this.session = this.cluster.connect(this.key_space);
this.prepared_statement = this.session.prepare(this.cql);
}
@Override
public void execute(Tuple input) {
try {
this.session.execute(this.prepared_statement.bind(input.getValues().toArray()));
} catch (Exception ex) {
logger.error("Exception: " + ex.getMessage() + ";\t tuple = " +
input);
this.collector.reportError(ex);
this.collector.fail(input);
}
this.collector.ack(input);
}
@Override
public void cleanup() {
this.session.close();
this.cluster.close();
}
}
POM snippet:
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>2.0.3</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.0.5</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>1.2.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.hmsonline</groupId>
<artifactId>storm-cassandra-cql</artifactId>
<version>0.1.8</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
On Thu, Sep 25, 2014 at 2:27 PM, Strulovitch, Zack <[email protected]
> wrote:
> Thank you Robert and Harsha. I like Robert's suggestion since it was
> updated recently.
>
>
> *From:* Robert Lee [[email protected]]
> *Sent:* Thursday, September 25, 2014 2:24 PM
> *To:* [email protected]
> *Subject:* Re: Cassandra bolt
>
> If you are looking for CQL, I'd suggest:
>
> https://github.com/hmsonline/storm-cassandra-cql
>
> On Thu, Sep 25, 2014 at 2:22 PM, Harsha <[email protected]> wrote:
>
>> did you tried https://github.com/ptgoetz/storm-cassandra.
>>
>>
>> On Thu, Sep 25, 2014, at 11:20 AM, Strulovitch, Zack wrote:
>>
>> I've updated to 0.9.2 from pre-apache version 0.9.0.1 (which broke my
>> Cassandra bolt implemented using this code :
>> https://github.com/tjake/stormscraper
>> https://github.com/tjake/stormscraper )
>> According to some posts, this is due to netty conflict. Could anyone
>> please suggest me an alternative reliable Cassandra bolt implementation?
>> Thank you in advance,
>> Zack
>>
>>
>> ------------------------------
>>
>> This e-mail contains privileged and confidential information intended
>> for the use of the addressees named above. If you are not the intended
>> recipient of this e-mail, you are hereby notified that you must not
>> disseminate, copy or take any action in respect of any information
>> contained in it. If you have received this e-mail in error, please notify
>> the sender immediately by e-mail and immediately destroy this e-mail and
>> its attachments.
>>
>>
>>
>
>
> ------------------------------
>
> This e-mail contains privileged and confidential information intended for
> the use of the addressees named above. If you are not the intended
> recipient of this e-mail, you are hereby notified that you must not
> disseminate, copy or take any action in respect of any information
> contained in it. If you have received this e-mail in error, please notify
> the sender immediately by e-mail and immediately destroy this e-mail and
> its attachments.
>
--
Dan DeCapria
CivicScience, Inc.
Back-End Data IS/BI/DM/ML Specialist