Hi Kushan,

I've blasted production Cassandra rings in the past from Storm, and I have
not had much difficulty with too many open connections or anything else if
I cleanup.  Please cf the CassandraWriterBolt.java code below (for CQL
prepared statements).  Hope this helps, -Dan

        <dependency>
            <groupId>com.datastax.cassandra</groupId>
            <artifactId>cassandra-driver-core</artifactId>
            <version>2.1.2</version>
            <scope>compile</scope>
            <exclusions>
                <exclusion>
                    <artifactId>guava</artifactId>
                    <groupId>com.google.guava</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.xerial.snappy</groupId>
            <artifactId>snappy-java</artifactId>
            <version>1.0.5</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>net.jpountz.lz4</groupId>
            <artifactId>lz4</artifactId>
            <version>1.2.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>18.0</version>
            <scope>compile</scope>
        </dependency>

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import com.datastax.driver.core.*;
import com.datastax.driver.core.exceptions.QueryTimeoutException;
import com.datastax.driver.core.policies.*;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CassandraWriterBolt extends BaseRichBolt {

    private OutputCollector collector;
    private static final Logger logger =
LoggerFactory.getLogger(CassandraWriterBolt.class);
    private TopologyContext context;

    private String node;
    private String key_space;
    private String cql;

    private Cluster cluster;
    private Session session;
    private PreparedStatement prepared_statement;
    private int prepared_statement_value_size;

    public CassandraWriterBolt(String node, String key_space, String cql) {
        this.node = node.trim();
        this.key_space = key_space.trim();
        this.cql = cql.trim();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        //  we don't emit anything from here
    }

    private void initialize() {
        this.cluster = Cluster.builder()
                .withoutJMXReporting()
                .withoutMetrics()
                .addContactPoint(this.node)
                .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
                .withReconnectionPolicy(new
ExponentialReconnectionPolicy(100L, 5000L))
                .withLoadBalancingPolicy(new TokenAwarePolicy(new
RoundRobinPolicy()))
                .build();

        this.session = this.cluster.connect(this.key_space);
        this.prepared_statement =
this.session.prepare(this.cql).setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
        this.prepared_statement_value_size = this.cql.length() -
this.cql.replace("?", "").length();
        logger.debug("Initialization of Writer Cassandra Cluster Object ["
+ this.cluster.getClusterName() + "]");
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
        this.context = context;
        this.collector = collector;
        initialize();
    }

    @Override
    public void execute(Tuple input) {
        try {

this.session.execute(this.prepared_statement.bind(input.getValues().subList(0,
this.prepared_statement_value_size).toArray()));
            this.collector.ack(input);
        } catch (QueryTimeoutException qtex) {
            logger.warn(
                    "QueryTimeoutException: " + qtex.getMessage() +
                    "\ttime = " + System.currentTimeMillis() +
                    "\ttuple = " + input +
                    "\t Reinitializing Connection. "
            );
            cleanup();
            initialize();
            this.collector.reportError(qtex);
            this.collector.fail(input);
        } catch (Exception ex) {
            logger.error("Exception: " + ex.getMessage() + ";\t tuple = " +
input);
            this.collector.reportError(ex);
            this.collector.fail(input);
        }
    }

    @Override
    public void cleanup() {
        logger.debug("Closing Cassandra Writer Cluster Object [" +
this.cluster.getClusterName() + "]");
        this.session.close();
        this.cluster.close();
    }

}




On Tue, Mar 3, 2015 at 4:12 PM, Kushan Maskey <
[email protected]> wrote:

> I have bolt that inserts data into Cassandra database. When I kill the
> topology on test and production server, how can I make sure that the
> Cassandra session. The reasons I am asking is sometime I get too many files
> open exception thrown which tells me that there are number of open
> connections. Please let me know if I am not correct.
>
> I try to add code in the cleanup() to go and close the sessions didnt help.
>
> Thanks.
> --
> Kushan Maskey
> 817.403.7500
> M. Miller & Associates <http://mmillerassociates.com/>
> [email protected]
>

Reply via email to