Thank Dan, I implemented my code as per you suggestions. But I still need to wait and see if it is working as expected.
If there is any other way anyone has experienced would be highly appreciated. -- Kushan Maskey 817.403.7500 M. Miller & Associates <http://mmillerassociates.com/> [email protected] On Tue, Mar 3, 2015 at 3:25 PM, Dan DeCapria, CivicScience < [email protected]> wrote: > Hi Kushan, > > I've blasted production Cassandra rings in the past from Storm, and I have > not had much difficulty with too many open connections or anything else if > I cleanup. Please cf the CassandraWriterBolt.java code below (for CQL > prepared statements). Hope this helps, -Dan > > <dependency> > <groupId>com.datastax.cassandra</groupId> > <artifactId>cassandra-driver-core</artifactId> > <version>2.1.2</version> > <scope>compile</scope> > <exclusions> > <exclusion> > <artifactId>guava</artifactId> > <groupId>com.google.guava</groupId> > </exclusion> > </exclusions> > </dependency> > <dependency> > <groupId>org.xerial.snappy</groupId> > <artifactId>snappy-java</artifactId> > <version>1.0.5</version> > <scope>compile</scope> > </dependency> > <dependency> > <groupId>net.jpountz.lz4</groupId> > <artifactId>lz4</artifactId> > <version>1.2.0</version> > <scope>compile</scope> > </dependency> > <dependency> > <groupId>com.google.guava</groupId> > <artifactId>guava</artifactId> > <version>18.0</version> > <scope>compile</scope> > </dependency> > > import backtype.storm.task.OutputCollector; > import backtype.storm.task.TopologyContext; > import backtype.storm.topology.OutputFieldsDeclarer; > import backtype.storm.topology.base.BaseRichBolt; > import backtype.storm.tuple.Tuple; > import com.datastax.driver.core.*; > import com.datastax.driver.core.exceptions.QueryTimeoutException; > import com.datastax.driver.core.policies.*; > import java.util.Map; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > public class CassandraWriterBolt extends BaseRichBolt { > > private OutputCollector collector; > private static final Logger logger = > LoggerFactory.getLogger(CassandraWriterBolt.class); > private TopologyContext context; > > private String node; > private String key_space; > private String cql; > > private Cluster cluster; > private Session session; > private PreparedStatement prepared_statement; > private int prepared_statement_value_size; > > public CassandraWriterBolt(String node, String key_space, String cql) { > this.node = node.trim(); > this.key_space = key_space.trim(); > this.cql = cql.trim(); > } > > @Override > public void declareOutputFields(OutputFieldsDeclarer declarer) { > // we don't emit anything from here > } > > private void initialize() { > this.cluster = Cluster.builder() > .withoutJMXReporting() > .withoutMetrics() > .addContactPoint(this.node) > > .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE) > .withReconnectionPolicy(new > ExponentialReconnectionPolicy(100L, 5000L)) > .withLoadBalancingPolicy(new TokenAwarePolicy(new > RoundRobinPolicy())) > .build(); > > this.session = this.cluster.connect(this.key_space); > this.prepared_statement = > this.session.prepare(this.cql).setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); > this.prepared_statement_value_size = this.cql.length() - > this.cql.replace("?", "").length(); > logger.debug("Initialization of Writer Cassandra Cluster Object [" > + this.cluster.getClusterName() + "]"); > } > > @Override > public void prepare(Map stormConf, TopologyContext context, > OutputCollector collector) { > this.context = context; > this.collector = collector; > initialize(); > } > > @Override > public void execute(Tuple input) { > try { > > this.session.execute(this.prepared_statement.bind(input.getValues().subList(0, > this.prepared_statement_value_size).toArray())); > this.collector.ack(input); > } catch (QueryTimeoutException qtex) { > logger.warn( > "QueryTimeoutException: " + qtex.getMessage() + > "\ttime = " + System.currentTimeMillis() + > "\ttuple = " + input + > "\t Reinitializing Connection. " > ); > cleanup(); > initialize(); > this.collector.reportError(qtex); > this.collector.fail(input); > } catch (Exception ex) { > logger.error("Exception: " + ex.getMessage() + ";\t tuple = " > + input); > this.collector.reportError(ex); > this.collector.fail(input); > } > } > > @Override > public void cleanup() { > logger.debug("Closing Cassandra Writer Cluster Object [" + > this.cluster.getClusterName() + "]"); > this.session.close(); > this.cluster.close(); > } > > } > > > > > On Tue, Mar 3, 2015 at 4:12 PM, Kushan Maskey < > [email protected]> wrote: > >> I have bolt that inserts data into Cassandra database. When I kill the >> topology on test and production server, how can I make sure that the >> Cassandra session. The reasons I am asking is sometime I get too many files >> open exception thrown which tells me that there are number of open >> connections. Please let me know if I am not correct. >> >> I try to add code in the cleanup() to go and close the sessions didnt >> help. >> >> Thanks. >> -- >> Kushan Maskey >> 817.403.7500 >> M. Miller & Associates <http://mmillerassociates.com/> >> [email protected] >> > > >
