Repository: storm Updated Branches: refs/heads/master 13c33f335 -> 0c2b3a4f6
Merge branch 'STORM-821' of github.com:Parth-Brahmbhatt/incubator-storm into 0.10.x-branch Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/05d1f8b2 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/05d1f8b2 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/05d1f8b2 Branch: refs/heads/master Commit: 05d1f8b2451a0f958ff079699862c219ddb26833 Parents: 13c33f3 Author: P. Taylor Goetz <ptgo...@gmail.com> Authored: Fri May 29 15:41:12 2015 -0400 Committer: P. Taylor Goetz <ptgo...@gmail.com> Committed: Fri May 29 15:47:08 2015 -0400 ---------------------------------------------------------------------- external/storm-jdbc/README.md | 72 +++++++++++++++----- .../storm/jdbc/bolt/AbstractJdbcBolt.java | 17 +++-- .../apache/storm/jdbc/bolt/JdbcInsertBolt.java | 5 +- .../apache/storm/jdbc/bolt/JdbcLookupBolt.java | 5 +- .../org/apache/storm/jdbc/common/Column.java | 7 +- .../storm/jdbc/common/ConnectionProvider.java | 26 +++++++ .../jdbc/common/HikariCPConnectionProvider.java | 46 +++++++++++++ .../apache/storm/jdbc/common/JdbcClient.java | 19 ++---- .../storm/jdbc/mapper/SimpleJdbcMapper.java | 6 +- .../storm/jdbc/trident/state/JdbcState.java | 13 ++-- .../storm/jdbc/common/JdbcClientTest.java | 5 +- .../jdbc/topology/AbstractUserTopology.java | 17 +++-- .../jdbc/topology/UserPersistanceTopology.java | 18 ++++- .../UserPersistanceTridentTopology.java | 2 +- 14 files changed, 196 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/README.md ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md index ef7845a..2d1301a 100644 --- a/external/storm-jdbc/README.md +++ b/external/storm-jdbc/README.md @@ -1,10 +1,37 @@ #Storm JDBC Storm/Trident integration for JDBC. This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database table or to execute select queries against a database and enrich tuples -in a storm topology. This code uses HikariCP for connection pooling. See http://brettwooldridge.github.io/HikariCP. +in a storm topology. ## Inserting into a database. The bolt and trident state included in this package for inserting data into a database tables are tied to a single table. + +### ConnectionProvider +An interface that should be implemented by different connection pooling mechanism `org.apache.storm.jdbc.common.ConnectionPrvoider` + +```java +public interface ConnectionPrvoider extends Serializable { + /** + * method must be idempotent. + */ + void prepare(); + + /** + * + * @return a DB connection over which the queries can be executed. + */ + Connection getConnection(); + + /** + * called once when the system is shutting down, should be idempotent. + */ + void cleanup(); +} +``` + +Out of the box we support `org.apache.storm.jdbc.common.HikariCPConnectionProvider` which is an implementation that uses HikariCP. + +###JdbcMapper The main API for inserting data in a table using JDBC is the `org.apache.storm.jdbc.mapper.JdbcMapper` interface: ```java @@ -17,11 +44,12 @@ The `getColumns()` method defines how a storm tuple maps to a list of columns re **The order of the returned list is important. The place holders in the supplied queries are resolved in the same order as returned list.** For example if the user supplied insert query is `insert into user(user_id, user_name, create_date) values (?,?, now())` the 1st item of the returned list of `getColumns` method will map to the 1st place holder and the 2nd to the 2nd and so on. We do not parse -the supplied queries to try and resolve place holder by column names. +the supplied queries to try and resolve place holder by column names. Not making any assumptions about the query syntax allows this connector +to be used by some non-standard sql frameworks like Pheonix which only supports upsert into. ### JdbcInsertBolt -To use the `JdbcInsertBolt`, you construct an instance of it and specify a configuration key in your storm config that holds the -hikari configuration map and a `JdbcMapper` implementation that coverts storm tuple to DB row. In addition, you must either supply +To use the `JdbcInsertBolt`, you construct an instance of it by specifying a `ConnectionProvider` implementation +and a `JdbcMapper` implementation that converts storm tuple to DB row. In addition, you must either supply a table name using `withTableName` method or an insert query using `withInsertQuery`. If you specify a insert query you should ensure that your `JdbcMapper` implementation will return a list of columns in the same order as in your insert query. You can optionally specify a query timeout seconds param that specifies max seconds an insert query can take. @@ -29,13 +57,21 @@ The default is set to value of topology.message.timeout.secs and a value of -1 w You should set the query timeout value to be <= topology.message.timeout.secs. ```java -Config config = new Config(); -config.put("jdbc.conf", hikariConfigMap); -JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt("jdbc.conf",simpleJdbcMapper) +Map hikariConfigMap = Maps.newHashMap(); +hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource"); +hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test"); +hikariConfigMap.put("dataSource.user","root"); +hikariConfigMap.put("dataSource.password","password"); +ConnectionProvider connectionProvider = new HikariCPConnectionProvider(map); + +String tableName = "user_details"; +JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider); + +JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper) .withTableName("user") .withQueryTimeoutSecs(30); Or -JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt("jdbc.conf",simpleJdbcMapper) +JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper) .withInsertQuery("insert into user values (?,?)") .withQueryTimeoutSecs(30); ``` @@ -45,7 +81,7 @@ JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt("jdbc.conf",simpleJdbcMa tuple to a Database row. `SimpleJdbcMapper` assumes that the storm tuple has fields with same name as the column name in the database table that you intend to write to. -To use `SimpleJdbcMapper`, you simply tell it the tableName that you want to write to and provide a hikari configuration map. +To use `SimpleJdbcMapper`, you simply tell it the tableName that you want to write to and provide a connectionProvider instance. The following code creates a `SimpleJdbcMapper` instance that: @@ -60,8 +96,9 @@ hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDa hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test"); hikariConfigMap.put("dataSource.user","root"); hikariConfigMap.put("dataSource.password","password"); +ConnectionProvider connectionProvider = new HikariCPConnectionProvider(map); String tableName = "user_details"; -JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, map); +JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider); ``` The mapper initialized in the example above assumes a storm tuple has value for all the columns of the table you intend to insert data into and its `getColumn` method will return the columns in the order in which Jdbc connection instance's `connection.getMetaData().getColumns();` method returns them. @@ -89,12 +126,12 @@ JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema); ``` ### JdbcTridentState We also support a trident persistent state that can be used with trident topologies. To create a jdbc persistent trident -state you need to initialize it with the table name, the JdbcMapper instance and name of storm config key that holds the -hikari configuration map. See the example below: +state you need to initialize it with the table name or an insert query, the JdbcMapper instance and connection provider instance. +See the example below: ```java JdbcState.Options options = new JdbcState.Options() - .withConfigKey("jdbc.conf") + .withConnectionProvider(connectionProvider) .withMapper(jdbcMapper) .withTableName("user_details") .withQueryTimeoutSecs(30); @@ -151,15 +188,14 @@ this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColum ``` ### JdbcLookupBolt -To use the `JdbcLookupBolt`, construct an instance of it and specify a configuration key in your storm config that hold the -hikari configuration map. In addition you must specify the `JdbcLookupMapper` and the select query to execute. +To use the `JdbcLookupBolt`, construct an instance of it using a `ConnectionProvider` instance, `JdbcLookupMapper` instance and the select query to execute. You can optionally specify a query timeout seconds param that specifies max seconds the select query can take. The default is set to value of topology.message.timeout.secs. You should set this value to be <= topology.message.timeout.secs. ```java String selectSql = "select user_name from user_details where user_id = ?"; SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns) -JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt("jdbc.conf", selectSql, lookupMapper) +JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt(connectionProvider, selectSql, lookupMapper) .withQueryTimeoutSecs(30); ``` @@ -168,7 +204,7 @@ We also support a trident query state that can be used with trident topologies. ```java JdbcState.Options options = new JdbcState.Options() - .withConfigKey("jdbc.conf") + .withConnectionProvider(connectionProvider) .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("user_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER)))) .withSelectQuery("select user_name from user_details where user_id = ?"); .withQueryTimeoutSecs(30); @@ -210,7 +246,7 @@ To make it work with Mysql, you can add the following to the pom.xml ``` You can generate a single jar with dependencies using mvn assembly plugin. To use the plugin add the following to your pom.xml and execute -mvn clean compile assembly:single. +`mvn clean compile assembly:single` ``` <plugin> http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java index 0d30529..15a2345 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java @@ -21,7 +21,7 @@ import backtype.storm.Config; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.base.BaseRichBolt; -import org.apache.commons.lang.Validate; +import org.apache.storm.jdbc.common.ConnectionProvider; import org.apache.storm.jdbc.common.JdbcClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,22 +36,27 @@ public abstract class AbstractJdbcBolt extends BaseRichBolt { protected transient JdbcClient jdbcClient; protected String configKey; protected Integer queryTimeoutSecs; + protected ConnectionProvider connectionProvider; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) { this.collector = collector; - Map<String, Object> conf = (Map<String, Object>)map.get(this.configKey); - Validate.notEmpty(conf, "Hikari configuration not found using key '" + this.configKey + "'"); + connectionProvider.prepare(); if(queryTimeoutSecs == null) { queryTimeoutSecs = Integer.parseInt(map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).toString()); } - this.jdbcClient = new JdbcClient(conf, queryTimeoutSecs); + this.jdbcClient = new JdbcClient(connectionProvider, queryTimeoutSecs); } - public AbstractJdbcBolt(String configKey) { - this.configKey = configKey; + public AbstractJdbcBolt(ConnectionProvider connectionProvider) { + this.connectionProvider = connectionProvider; + } + + @Override + public void cleanup() { + connectionProvider.cleanup(); } } http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java index 131da27..2f29000 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java @@ -22,6 +22,7 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; import org.apache.commons.lang3.StringUtils; import org.apache.storm.jdbc.common.Column; +import org.apache.storm.jdbc.common.ConnectionProvider; import org.apache.storm.jdbc.mapper.JdbcMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,8 +43,8 @@ public class JdbcInsertBolt extends AbstractJdbcBolt { private String insertQuery; private JdbcMapper jdbcMapper; - public JdbcInsertBolt(String configKey, JdbcMapper jdbcMapper) { - super(configKey); + public JdbcInsertBolt(ConnectionProvider connectionProvider, JdbcMapper jdbcMapper) { + super(connectionProvider); this.jdbcMapper = jdbcMapper; } http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java index e1b1553..25122e2 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java @@ -21,6 +21,7 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import org.apache.storm.jdbc.common.Column; +import org.apache.storm.jdbc.common.ConnectionProvider; import org.apache.storm.jdbc.mapper.JdbcLookupMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,8 +38,8 @@ public class JdbcLookupBolt extends AbstractJdbcBolt { private JdbcLookupMapper jdbcLookupMapper; - public JdbcLookupBolt(String configKey, String selectQuery, JdbcLookupMapper jdbcLookupMapper) { - super(configKey); + public JdbcLookupBolt(ConnectionProvider connectionProvider, String selectQuery, JdbcLookupMapper jdbcLookupMapper) { + super(connectionProvider); this.selectQuery = selectQuery; this.jdbcLookupMapper = jdbcLookupMapper; } http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java index c531fff..73bc0fd 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java @@ -84,19 +84,18 @@ public class Column<T> implements Serializable { if (this == o) return true; if (!(o instanceof Column)) return false; - Column column = (Column) o; + Column<?> column = (Column<?>) o; if (sqlType != column.sqlType) return false; if (!columnName.equals(column.columnName)) return false; - if (!val.equals(column.val)) return false; + return val != null ? val.equals(column.val) : column.val == null; - return true; } @Override public int hashCode() { int result = columnName.hashCode(); - result = 31 * result + val.hashCode(); + result = 31 * result + (val != null ? val.hashCode() : 0); result = 31 * result + sqlType; return result; } http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/ConnectionProvider.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/ConnectionProvider.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/ConnectionProvider.java new file mode 100644 index 0000000..b838e48 --- /dev/null +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/ConnectionProvider.java @@ -0,0 +1,26 @@ +package org.apache.storm.jdbc.common; + +import java.io.Serializable; +import java.sql.Connection; +import java.util.Map; + +/** + * Provides a database connection. + */ +public interface ConnectionProvider extends Serializable { + /** + * method must be idempotent. + */ + void prepare(); + + /** + * + * @return a DB connection over which the queries can be executed. + */ + Connection getConnection(); + + /** + * called once when the system is shutting down, should be idempotent. + */ + void cleanup(); +} http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/HikariCPConnectionProvider.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/HikariCPConnectionProvider.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/HikariCPConnectionProvider.java new file mode 100644 index 0000000..b523fcc --- /dev/null +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/HikariCPConnectionProvider.java @@ -0,0 +1,46 @@ +package org.apache.storm.jdbc.common; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Map; +import java.util.Properties; + +public class HikariCPConnectionProvider implements ConnectionProvider { + + private Map<String, Object> configMap; + private transient HikariDataSource dataSource; + + public HikariCPConnectionProvider(Map<String, Object> hikariCPConfigMap) { + this.configMap = hikariCPConfigMap; + } + + @Override + public synchronized void prepare() { + if(dataSource == null) { + Properties properties = new Properties(); + properties.putAll(configMap); + HikariConfig config = new HikariConfig(properties); + this.dataSource = new HikariDataSource(config); + this.dataSource.setAutoCommit(false); + } + } + + @Override + public Connection getConnection() { + try { + return this.dataSource.getConnection(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public void cleanup() { + if(dataSource != null) { + dataSource.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java index 63797f4..228babe 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java @@ -21,8 +21,6 @@ import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.collect.Collections2; import com.google.common.collect.Lists; -import com.zaxxer.hikari.HikariConfig; -import com.zaxxer.hikari.HikariDataSource; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,14 +32,11 @@ import java.util.*; public class JdbcClient { private static final Logger LOG = LoggerFactory.getLogger(JdbcClient.class); - private HikariDataSource dataSource; + private ConnectionProvider connectionProvider; private int queryTimeoutSecs; - public JdbcClient(Map<String, Object> hikariConfigMap, int queryTimeoutSecs) { - Properties properties = new Properties(); - properties.putAll(hikariConfigMap); - HikariConfig config = new HikariConfig(properties); - this.dataSource = new HikariDataSource(config); + public JdbcClient(ConnectionProvider connectionProvider, int queryTimeoutSecs) { + this.connectionProvider = connectionProvider; this.queryTimeoutSecs = queryTimeoutSecs; } @@ -53,7 +48,7 @@ public class JdbcClient { public void executeInsertQuery(String query, List<List<Column>> columnLists) { Connection connection = null; try { - connection = this.dataSource.getConnection(); + connection = connectionProvider.getConnection(); boolean autoCommit = connection.getAutoCommit(); if(autoCommit) { connection.setAutoCommit(false); @@ -110,7 +105,7 @@ public class JdbcClient { public List<List<Column>> select(String sqlQuery, List<Column> queryParams) { Connection connection = null; try { - connection = this.dataSource.getConnection(); + connection = connectionProvider.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement(sqlQuery); if(queryTimeoutSecs > 0) { preparedStatement.setQueryTimeout(queryTimeoutSecs); @@ -166,7 +161,7 @@ public class JdbcClient { Connection connection = null; List<Column> columns = new ArrayList<Column>(); try { - connection = this.dataSource.getConnection(); + connection = connectionProvider.getConnection(); DatabaseMetaData metaData = connection.getMetaData(); ResultSet resultSet = metaData.getColumns(null, null, tableName, null); while (resultSet.next()) { @@ -183,7 +178,7 @@ public class JdbcClient { public void executeSql(String sql) { Connection connection = null; try { - connection = this.dataSource.getConnection(); + connection = connectionProvider.getConnection(); Statement statement = connection.createStatement(); statement.execute(sql); } catch (SQLException e) { http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java index 841d5d6..c4005e3 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java @@ -19,6 +19,7 @@ package org.apache.storm.jdbc.mapper; import backtype.storm.tuple.ITuple; import org.apache.storm.jdbc.common.Column; +import org.apache.storm.jdbc.common.ConnectionProvider; import org.apache.storm.jdbc.common.JdbcClient; import org.apache.storm.jdbc.common.Util; @@ -33,9 +34,10 @@ public class SimpleJdbcMapper implements JdbcMapper { private List<Column> schemaColumns; - public SimpleJdbcMapper(String tableName, Map hikariConfigurationMap) { + public SimpleJdbcMapper(String tableName, ConnectionProvider connectionProvider) { int queryTimeoutSecs = 30; - JdbcClient client = new JdbcClient(hikariConfigurationMap, queryTimeoutSecs); + connectionProvider.prepare(); + JdbcClient client = new JdbcClient(connectionProvider, queryTimeoutSecs); this.schemaColumns = client.getColumnSchema(tableName); } http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java index 0f301f4..8afc466 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java @@ -21,9 +21,9 @@ import backtype.storm.Config; import backtype.storm.topology.FailedException; import backtype.storm.tuple.Values; import com.google.common.collect.Lists; -import org.apache.commons.lang.Validate; import org.apache.commons.lang3.StringUtils; import org.apache.storm.jdbc.common.Column; +import org.apache.storm.jdbc.common.ConnectionProvider; import org.apache.storm.jdbc.common.JdbcClient; import org.apache.storm.jdbc.mapper.JdbcMapper; import org.apache.storm.jdbc.mapper.JdbcLookupMapper; @@ -54,14 +54,14 @@ public class JdbcState implements State { public static class Options implements Serializable { private JdbcMapper mapper; private JdbcLookupMapper jdbcLookupMapper; - private String configKey; + private ConnectionProvider connectionProvider; private String tableName; private String insertQuery; private String selectQuery; private Integer queryTimeoutSecs; - public Options withConfigKey(String configKey) { - this.configKey = configKey; + public Options withConnectionPrvoider(ConnectionProvider connectionProvider) { + this.connectionProvider = connectionProvider; return this; } @@ -97,8 +97,7 @@ public class JdbcState implements State { } protected void prepare() { - Map<String, Object> conf = (Map<String, Object>) map.get(options.configKey); - Validate.notEmpty(conf, "Hikari configuration not found using key '" + options.configKey + "'"); + options.connectionProvider.prepare(); if(StringUtils.isBlank(options.insertQuery) && StringUtils.isBlank(options.tableName) && StringUtils.isBlank(options.selectQuery)) { throw new IllegalArgumentException("If you are trying to insert into DB you must supply either insertQuery or tableName." + @@ -109,7 +108,7 @@ public class JdbcState implements State { options.queryTimeoutSecs = Integer.parseInt(map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).toString()); } - this.jdbcClient = new JdbcClient(conf, options.queryTimeoutSecs); + this.jdbcClient = new JdbcClient(options.connectionProvider, options.queryTimeoutSecs); } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java index 5b3be88..551cd72 100644 --- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java @@ -25,7 +25,6 @@ import org.junit.Before; import org.junit.Test; import java.sql.Timestamp; -import java.util.Date; import java.sql.Types; import java.util.ArrayList; import java.util.List; @@ -43,9 +42,11 @@ public class JdbcClientTest { map.put("dataSource.url", "jdbc:hsqldb:mem:test");//jdbc:mysql://localhost/test map.put("dataSource.user","SA");//root map.put("dataSource.password","");//password + ConnectionProvider connectionProvider = new HikariCPConnectionProvider(map); + connectionProvider.prepare(); int queryTimeoutSecs = 60; - this.client = new JdbcClient(map, queryTimeoutSecs); + this.client = new JdbcClient(connectionProvider, queryTimeoutSecs); client.executeSql("create table user_details (id integer, user_name varchar(100), created_timestamp TIMESTAMP)"); } http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java index e94aca2..9df5a86 100644 --- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java @@ -24,6 +24,8 @@ import backtype.storm.tuple.Fields; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.storm.jdbc.common.Column; +import org.apache.storm.jdbc.common.ConnectionProvider; +import org.apache.storm.jdbc.common.HikariCPConnectionProvider; import org.apache.storm.jdbc.common.JdbcClient; import org.apache.storm.jdbc.mapper.JdbcMapper; import org.apache.storm.jdbc.mapper.JdbcLookupMapper; @@ -56,6 +58,7 @@ public abstract class AbstractUserTopology { protected UserSpout userSpout; protected JdbcMapper jdbcMapper; protected JdbcLookupMapper jdbcLookupMapper; + protected ConnectionProvider connectionProvider; protected static final String TABLE_NAME = "user"; protected static final String JDBC_CONF = "jdbc.conf"; @@ -72,23 +75,29 @@ public abstract class AbstractUserTopology { map.put("dataSourceClassName", args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource map.put("dataSource.url", args[1]);//jdbc:mysql://localhost/test map.put("dataSource.user", args[2]);//root - map.put("dataSource.password", args[3]);//password + + if(args.length == 4) { + map.put("dataSource.password", args[3]);//password + } Config config = new Config(); config.put(JDBC_CONF, map); + ConnectionProvider connectionProvider = new HikariCPConnectionProvider(map); + connectionProvider.prepare(); int queryTimeoutSecs = 60; - JdbcClient jdbcClient = new JdbcClient(map, queryTimeoutSecs); + JdbcClient jdbcClient = new JdbcClient(connectionProvider, queryTimeoutSecs); for (String sql : setupSqls) { jdbcClient.executeSql(sql); } this.userSpout = new UserSpout(); - this.jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, map); + this.jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, connectionProvider); + connectionProvider.cleanup(); Fields outputFields = new Fields("user_id", "user_name", "dept_name", "create_date"); List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER)); this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns); - + this.connectionProvider = new HikariCPConnectionProvider(map); if (args.length == 4) { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", config, getTopology()); http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java index 7c529c8..585994e 100644 --- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java @@ -19,8 +19,15 @@ package org.apache.storm.jdbc.topology; import backtype.storm.generated.StormTopology; import backtype.storm.topology.TopologyBuilder; +import com.google.common.collect.Lists; import org.apache.storm.jdbc.bolt.JdbcInsertBolt; import org.apache.storm.jdbc.bolt.JdbcLookupBolt; +import org.apache.storm.jdbc.common.Column; +import org.apache.storm.jdbc.mapper.JdbcMapper; +import org.apache.storm.jdbc.mapper.SimpleJdbcMapper; + +import java.sql.Types; +import java.util.List; public class UserPersistanceTopology extends AbstractUserTopology { @@ -34,8 +41,15 @@ public class UserPersistanceTopology extends AbstractUserTopology { @Override public StormTopology getTopology() { - JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(JDBC_CONF, SELECT_QUERY, this.jdbcLookupMapper); - JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(JDBC_CONF, this.jdbcMapper).withInsertQuery("insert into user (create_date, dept_name, user_id, user_name) values (?,?,?,?)"); + JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(connectionProvider, SELECT_QUERY, this.jdbcLookupMapper); + + //must specify column schema when providing custom query. + List<Column> schemaColumns = Lists.newArrayList(new Column("create_date", Types.DATE), + new Column("dept_name", Types.VARCHAR), new Column("user_id", Types.INTEGER), new Column("user_name", Types.VARCHAR)); + JdbcMapper mapper = new SimpleJdbcMapper(schemaColumns); + + JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, mapper) + .withInsertQuery("insert into user (create_date, dept_name, user_id, user_name) values (?,?,?,?)"); // userSpout ==> jdbcBolt TopologyBuilder builder = new TopologyBuilder(); http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java index 2cf3403..7cf0ce6 100644 --- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java @@ -44,7 +44,7 @@ public class UserPersistanceTridentTopology extends AbstractUserTopology { TridentTopology topology = new TridentTopology(); JdbcState.Options options = new JdbcState.Options() - .withConfigKey(JDBC_CONF) + .withConnectionPrvoider(connectionProvider) .withMapper(this.jdbcMapper) .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("dept_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER)))) .withTableName(TABLE_NAME)