[2/5] storm git commit: Also leave tuple type to log when task is null
Also leave tuple type to log when task is null Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9d65cb37 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9d65cb37 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9d65cb37 Branch: refs/heads/0.10.x-branch Commit: 9d65cb376bedddf3c8f645744aaeaada55e029b6 Parents: 56b9e5a Author: Jungtaek Lim kabh...@gmail.com Authored: Thu Apr 23 08:42:35 2015 +0900 Committer: P. Taylor Goetz ptgo...@gmail.com Committed: Thu May 28 16:00:36 2015 -0400 -- storm-core/src/clj/backtype/storm/daemon/worker.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/9d65cb37/storm-core/src/clj/backtype/storm/daemon/worker.clj -- diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index c4afcd9..0fddef5 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -135,7 +135,7 @@ (let [remote (.get remoteMap node+port)] (if (not-nil? task) (.add remote (TaskMessage. task (.serialize serializer tuple))) -(log-warn Can't transfer tuple - task value is null. tuple information: tuple)) +(log-warn Can't transfer tuple - task value is null. tuple type: (type tuple) and information: tuple)) (local-transfer local) (disruptor/publish transfer-queue remoteMap)
[1/5] storm git commit: Log task is null instead of let worker died
Repository: storm Updated Branches: refs/heads/0.10.x-branch f0281ec8a - 02f4ae96b Log task is null instead of let worker died * when task is null in transfer-fn, creating TaskMessage leads NPE Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/56b9e5a6 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/56b9e5a6 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/56b9e5a6 Branch: refs/heads/0.10.x-branch Commit: 56b9e5a6081f74d285133f24430bb44c4fa11758 Parents: bafb86b Author: Jungtaek Lim kabh...@gmail.com Authored: Fri Apr 17 14:09:39 2015 +0900 Committer: P. Taylor Goetz ptgo...@gmail.com Committed: Thu May 28 16:00:25 2015 -0400 -- storm-core/src/clj/backtype/storm/daemon/worker.clj | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/56b9e5a6/storm-core/src/clj/backtype/storm/daemon/worker.clj -- diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index 978ea16..c4afcd9 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -133,8 +133,10 @@ (when (not (.get remoteMap node+port)) (.put remoteMap node+port (ArrayList.))) (let [remote (.get remoteMap node+port)] - (.add remote (TaskMessage. task (.serialize serializer tuple))) - + (if (not-nil? task) +(.add remote (TaskMessage. task (.serialize serializer tuple))) +(log-warn Can't transfer tuple - task value is null. tuple information: tuple)) + (local-transfer local) (disruptor/publish transfer-queue remoteMap) ))]
[2/2] storm git commit: STORM-835 Netty Client hold batch object until io operation complete
STORM-835 Netty Client hold batch object until io operation complete Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6a230e5b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6a230e5b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6a230e5b Branch: refs/heads/0.10.x-branch Commit: 6a230e5b8f304985a40e36d363e985e3a68c1800 Parents: cdce04f Author: zhanghailei zhanghai...@youku.com Authored: Mon May 25 10:00:12 2015 +0800 Committer: P. Taylor Goetz ptgo...@gmail.com Committed: Fri May 29 12:10:31 2015 -0400 -- storm-core/src/jvm/backtype/storm/messaging/netty/Client.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/6a230e5b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java -- diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java index f332bb3..ac3f3f2 100644 --- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java +++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java @@ -461,7 +461,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject { * * If the write operation fails, then we will close the channel and trigger a reconnect. */ -private synchronized void flushMessages(Channel channel, final MessageBatch batch) { +private synchronized void flushMessages(Channel channel, MessageBatch batch) { if (!containsMessages(batch)) { return; } @@ -476,7 +476,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject { pendingMessages.getAndAdd(0 - numMessages); if (future.isSuccess()) { LOG.debug(sent {} messages to {}, numMessages, dstAddressPrefixedName); -messagesSent.getAndAdd(batch.size()); +messagesSent.getAndAdd(numMessages); } else { LOG.error(failed to send {} messages to {}: {}, numMessages, dstAddressPrefixedName,
storm git commit: [maven-release-plugin] prepare for next development iteration
Repository: storm Updated Branches: refs/heads/0.9.x-branch c9c47b360 - f38240342 [maven-release-plugin] prepare for next development iteration Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f3824034 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f3824034 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f3824034 Branch: refs/heads/0.9.x-branch Commit: f3824034287ceec3e6da92dbfb819c0eef11dfad Parents: c9c47b3 Author: P. Taylor Goetz ptgo...@gmail.com Authored: Fri May 29 10:50:38 2015 -0400 Committer: P. Taylor Goetz ptgo...@gmail.com Committed: Fri May 29 10:50:38 2015 -0400 -- examples/storm-starter/pom.xml | 2 +- external/storm-hbase/pom.xml | 2 +- external/storm-hdfs/pom.xml | 2 +- external/storm-kafka/pom.xml | 2 +- pom.xml | 4 ++-- storm-buildtools/maven-shade-clojure-transformer/pom.xml | 2 +- storm-core/pom.xml | 2 +- storm-dist/binary/pom.xml| 2 +- storm-dist/source/pom.xml| 2 +- 9 files changed, 10 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/f3824034/examples/storm-starter/pom.xml -- diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml index 74ae50f..f2aacd0 100644 --- a/examples/storm-starter/pom.xml +++ b/examples/storm-starter/pom.xml @@ -20,7 +20,7 @@ parent artifactIdstorm/artifactId groupIdorg.apache.storm/groupId - version0.9.5/version + version0.9.6-SNAPSHOT/version relativePath../../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/storm/blob/f3824034/external/storm-hbase/pom.xml -- diff --git a/external/storm-hbase/pom.xml b/external/storm-hbase/pom.xml index bec0869..fe19688 100644 --- a/external/storm-hbase/pom.xml +++ b/external/storm-hbase/pom.xml @@ -21,7 +21,7 @@ parent artifactIdstorm/artifactId groupIdorg.apache.storm/groupId -version0.9.5/version +version0.9.6-SNAPSHOT/version relativePath../../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/storm/blob/f3824034/external/storm-hdfs/pom.xml -- diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml index 6284865..a265490 100644 --- a/external/storm-hdfs/pom.xml +++ b/external/storm-hdfs/pom.xml @@ -21,7 +21,7 @@ parent artifactIdstorm/artifactId groupIdorg.apache.storm/groupId -version0.9.5/version +version0.9.6-SNAPSHOT/version relativePath../../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/storm/blob/f3824034/external/storm-kafka/pom.xml -- diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml index 01a262b..36f77c3 100644 --- a/external/storm-kafka/pom.xml +++ b/external/storm-kafka/pom.xml @@ -21,7 +21,7 @@ parent artifactIdstorm/artifactId groupIdorg.apache.storm/groupId -version0.9.5/version +version0.9.6-SNAPSHOT/version relativePath../../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/storm/blob/f3824034/pom.xml -- diff --git a/pom.xml b/pom.xml index 06e9f26..80fa42b 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ groupIdorg.apache.storm/groupId artifactIdstorm/artifactId -version0.9.5/version +version0.9.6-SNAPSHOT/version packagingpom/packaging nameStorm/name descriptionDistributed and fault-tolerant realtime computation/description @@ -166,7 +166,7 @@ scm connectionscm:git:https://git-wip-us.apache.org/repos/asf/storm.git/connection developerConnectionscm:git:https://git-wip-us.apache.org/repos/asf/storm.git/developerConnection -tagv0.9.5/tag +tagHEAD/tag urlhttps://git-wip-us.apache.org/repos/asf/storm/url /scm http://git-wip-us.apache.org/repos/asf/storm/blob/f3824034/storm-buildtools/maven-shade-clojure-transformer/pom.xml -- diff --git a/storm-buildtools/maven-shade-clojure-transformer/pom.xml b/storm-buildtools/maven-shade-clojure-transformer/pom.xml index ebfe6ab..dfca2a4 100644 ---
[1/9] storm git commit: STORM-821: Adding connection provider interface to decouple jdbc connector from a single connection pooling implementation. Also made changes to make the password arg optional
Repository: storm Updated Branches: refs/heads/0.10.x-branch 7e5e5f0e3 - 511869c6f STORM-821: Adding connection provider interface to decouple jdbc connector from a single connection pooling implementation. Also made changes to make the password arg optional in the example topologies. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7771a4dd Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7771a4dd Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7771a4dd Branch: refs/heads/0.10.x-branch Commit: 7771a4ddd7bbbda4cbb799b14e06d05693d834e6 Parents: 1498ed0 Author: Parth Brahmbhatt brahmbhatt.pa...@gmail.com Authored: Fri May 15 16:32:44 2015 -0700 Committer: Parth Brahmbhatt brahmbhatt.pa...@gmail.com Committed: Wed May 20 14:49:27 2015 -0700 -- external/storm-jdbc/README.md | 70 +++- .../storm/jdbc/bolt/AbstractJdbcBolt.java | 16 +++-- .../apache/storm/jdbc/bolt/JdbcInsertBolt.java | 5 +- .../apache/storm/jdbc/bolt/JdbcLookupBolt.java | 5 +- .../storm/jdbc/common/ConnectionPrvoider.java | 26 .../jdbc/common/HikariCPConnectionProvider.java | 46 + .../apache/storm/jdbc/common/JdbcClient.java| 19 ++ .../storm/jdbc/mapper/SimpleJdbcMapper.java | 6 +- .../storm/jdbc/trident/state/JdbcState.java | 12 ++-- .../storm/jdbc/common/JdbcClientTest.java | 4 +- .../jdbc/topology/AbstractUserTopology.java | 17 +++-- .../jdbc/topology/UserPersistanceTopology.java | 18 - .../UserPersistanceTridentTopology.java | 2 +- 13 files changed, 192 insertions(+), 54 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/7771a4dd/external/storm-jdbc/README.md -- diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md index ef7845a..81632f1 100644 --- a/external/storm-jdbc/README.md +++ b/external/storm-jdbc/README.md @@ -1,10 +1,37 @@ #Storm JDBC Storm/Trident integration for JDBC. This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database table or to execute select queries against a database and enrich tuples -in a storm topology. This code uses HikariCP for connection pooling. See http://brettwooldridge.github.io/HikariCP. +in a storm topology. ## Inserting into a database. The bolt and trident state included in this package for inserting data into a database tables are tied to a single table. + +### ConnectionProvider +An interface that should be implemented by different connection pooling mechanism `org.apache.storm.jdbc.common.ConnectionPrvoider` + +```java +public interface ConnectionPrvoider extends Serializable { +/** + * method must be idempotent. + */ +void prepare(); + +/** + * + * @return a DB connection over which the queries can be executed. + */ +Connection getConnection(); + +/** + * called once when the system is shutting down, should be idempotent. + */ +void cleanup(); +} +``` + +Out of the box we support `org.apache.storm.jdbc.common.HikariCPConnectionProvider` which is an implementation that uses HikariCP. + +###JdbcMapper The main API for inserting data in a table using JDBC is the `org.apache.storm.jdbc.mapper.JdbcMapper` interface: ```java @@ -17,11 +44,12 @@ The `getColumns()` method defines how a storm tuple maps to a list of columns re **The order of the returned list is important. The place holders in the supplied queries are resolved in the same order as returned list.** For example if the user supplied insert query is `insert into user(user_id, user_name, create_date) values (?,?, now())` the 1st item of the returned list of `getColumns` method will map to the 1st place holder and the 2nd to the 2nd and so on. We do not parse -the supplied queries to try and resolve place holder by column names. +the supplied queries to try and resolve place holder by column names. Not making any assumptions about the query syntax allows this connector +to be used by some non-standard sql frameworks like Pheonix which only supports upsert into. ### JdbcInsertBolt -To use the `JdbcInsertBolt`, you construct an instance of it and specify a configuration key in your storm config that holds the -hikari configuration map and a `JdbcMapper` implementation that coverts storm tuple to DB row. In addition, you must either supply +To use the `JdbcInsertBolt`, you construct an instance of it by specifying a `ConnectionProvider` implementation +and a `JdbcMapper` implementation that coverts storm tuple to DB row. In addition, you must either supply a table name using `withTableName` method or an insert query using
[4/9] storm git commit: Fixing typos in README
Fixing typos in README Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a633fe13 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a633fe13 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a633fe13 Branch: refs/heads/0.10.x-branch Commit: a633fe13bef01e9f3d1c559ca13f45bf8f1dbda2 Parents: d268903 Author: Parth Brahmbhatt brahmbhatt.pa...@gmail.com Authored: Tue May 26 15:51:06 2015 -0700 Committer: Parth Brahmbhatt brahmbhatt.pa...@gmail.com Committed: Tue May 26 15:51:06 2015 -0700 -- external/storm-jdbc/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/a633fe13/external/storm-jdbc/README.md -- diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md index 81632f1..192be73 100644 --- a/external/storm-jdbc/README.md +++ b/external/storm-jdbc/README.md @@ -49,7 +49,7 @@ to be used by some non-standard sql frameworks like Pheonix which only supports ### JdbcInsertBolt To use the `JdbcInsertBolt`, you construct an instance of it by specifying a `ConnectionProvider` implementation -and a `JdbcMapper` implementation that coverts storm tuple to DB row. In addition, you must either supply +and a `JdbcMapper` implementation that converts storm tuple to DB row. In addition, you must either supply a table name using `withTableName` method or an insert query using `withInsertQuery`. If you specify a insert query you should ensure that your `JdbcMapper` implementation will return a list of columns in the same order as in your insert query. You can optionally specify a query timeout seconds param that specifies max seconds an insert query can take.
[1/2] storm git commit: Merge branch 'STORM-821' of github.com:Parth-Brahmbhatt/incubator-storm into 0.10.x-branch
Repository: storm Updated Branches: refs/heads/master 13c33f335 - 0c2b3a4f6 Merge branch 'STORM-821' of github.com:Parth-Brahmbhatt/incubator-storm into 0.10.x-branch Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/05d1f8b2 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/05d1f8b2 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/05d1f8b2 Branch: refs/heads/master Commit: 05d1f8b2451a0f958ff079699862c219ddb26833 Parents: 13c33f3 Author: P. Taylor Goetz ptgo...@gmail.com Authored: Fri May 29 15:41:12 2015 -0400 Committer: P. Taylor Goetz ptgo...@gmail.com Committed: Fri May 29 15:47:08 2015 -0400 -- external/storm-jdbc/README.md | 72 +++- .../storm/jdbc/bolt/AbstractJdbcBolt.java | 17 +++-- .../apache/storm/jdbc/bolt/JdbcInsertBolt.java | 5 +- .../apache/storm/jdbc/bolt/JdbcLookupBolt.java | 5 +- .../org/apache/storm/jdbc/common/Column.java| 7 +- .../storm/jdbc/common/ConnectionProvider.java | 26 +++ .../jdbc/common/HikariCPConnectionProvider.java | 46 + .../apache/storm/jdbc/common/JdbcClient.java| 19 ++ .../storm/jdbc/mapper/SimpleJdbcMapper.java | 6 +- .../storm/jdbc/trident/state/JdbcState.java | 13 ++-- .../storm/jdbc/common/JdbcClientTest.java | 5 +- .../jdbc/topology/AbstractUserTopology.java | 17 +++-- .../jdbc/topology/UserPersistanceTopology.java | 18 - .../UserPersistanceTridentTopology.java | 2 +- 14 files changed, 196 insertions(+), 62 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/README.md -- diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md index ef7845a..2d1301a 100644 --- a/external/storm-jdbc/README.md +++ b/external/storm-jdbc/README.md @@ -1,10 +1,37 @@ #Storm JDBC Storm/Trident integration for JDBC. This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database table or to execute select queries against a database and enrich tuples -in a storm topology. This code uses HikariCP for connection pooling. See http://brettwooldridge.github.io/HikariCP. +in a storm topology. ## Inserting into a database. The bolt and trident state included in this package for inserting data into a database tables are tied to a single table. + +### ConnectionProvider +An interface that should be implemented by different connection pooling mechanism `org.apache.storm.jdbc.common.ConnectionPrvoider` + +```java +public interface ConnectionPrvoider extends Serializable { +/** + * method must be idempotent. + */ +void prepare(); + +/** + * + * @return a DB connection over which the queries can be executed. + */ +Connection getConnection(); + +/** + * called once when the system is shutting down, should be idempotent. + */ +void cleanup(); +} +``` + +Out of the box we support `org.apache.storm.jdbc.common.HikariCPConnectionProvider` which is an implementation that uses HikariCP. + +###JdbcMapper The main API for inserting data in a table using JDBC is the `org.apache.storm.jdbc.mapper.JdbcMapper` interface: ```java @@ -17,11 +44,12 @@ The `getColumns()` method defines how a storm tuple maps to a list of columns re **The order of the returned list is important. The place holders in the supplied queries are resolved in the same order as returned list.** For example if the user supplied insert query is `insert into user(user_id, user_name, create_date) values (?,?, now())` the 1st item of the returned list of `getColumns` method will map to the 1st place holder and the 2nd to the 2nd and so on. We do not parse -the supplied queries to try and resolve place holder by column names. +the supplied queries to try and resolve place holder by column names. Not making any assumptions about the query syntax allows this connector +to be used by some non-standard sql frameworks like Pheonix which only supports upsert into. ### JdbcInsertBolt -To use the `JdbcInsertBolt`, you construct an instance of it and specify a configuration key in your storm config that holds the -hikari configuration map and a `JdbcMapper` implementation that coverts storm tuple to DB row. In addition, you must either supply +To use the `JdbcInsertBolt`, you construct an instance of it by specifying a `ConnectionProvider` implementation +and a `JdbcMapper` implementation that converts storm tuple to DB row. In addition, you must either supply a table name using `withTableName` method or an insert query using `withInsertQuery`. If you specify a insert query you should ensure that your `JdbcMapper`
[6/9] storm git commit: Fixing README
Fixing README Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7062caa0 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7062caa0 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7062caa0 Branch: refs/heads/0.10.x-branch Commit: 7062caa02f01e4963919ff4ba7b80bc21a069855 Parents: 7ca8e29 Author: Parth Brahmbhatt brahmbhatt.pa...@gmail.com Authored: Tue May 26 16:12:38 2015 -0700 Committer: Parth Brahmbhatt brahmbhatt.pa...@gmail.com Committed: Tue May 26 16:12:38 2015 -0700 -- external/storm-jdbc/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/7062caa0/external/storm-jdbc/README.md -- diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md index 8595a39..2d1301a 100644 --- a/external/storm-jdbc/README.md +++ b/external/storm-jdbc/README.md @@ -98,7 +98,7 @@ hikariConfigMap.put(dataSource.user,root); hikariConfigMap.put(dataSource.password,password); ConnectionProvider connectionProvider = new HikariCPConnectionProvider(map); String tableName = user_details; -JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, map); +JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider); ``` The mapper initialized in the example above assumes a storm tuple has value for all the columns of the table you intend to insert data into and its `getColumn` method will return the columns in the order in which Jdbc connection instance's `connection.getMetaData().getColumns();` method returns them.
[3/9] storm git commit: STORM-821: Renaming the connection provider to fix the type. Allowing column val to be null.
STORM-821: Renaming the connection provider to fix the type. Allowing column val to be null. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d268903f Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d268903f Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d268903f Branch: refs/heads/0.10.x-branch Commit: d268903f026cf275b72e6246b61c769d28fb3ed1 Parents: f628bb4 Author: Parth Brahmbhatt brahmbhatt.pa...@gmail.com Authored: Tue May 26 15:46:40 2015 -0700 Committer: Parth Brahmbhatt brahmbhatt.pa...@gmail.com Committed: Tue May 26 15:46:40 2015 -0700 -- .../storm/jdbc/bolt/AbstractJdbcBolt.java | 15 ++- .../apache/storm/jdbc/bolt/JdbcInsertBolt.java | 6 ++--- .../apache/storm/jdbc/bolt/JdbcLookupBolt.java | 6 ++--- .../org/apache/storm/jdbc/common/Column.java| 7 +++--- .../storm/jdbc/common/ConnectionProvider.java | 26 .../storm/jdbc/common/ConnectionPrvoider.java | 26 .../jdbc/common/HikariCPConnectionProvider.java | 2 +- .../apache/storm/jdbc/common/JdbcClient.java| 14 +-- .../storm/jdbc/mapper/SimpleJdbcMapper.java | 8 +++--- .../storm/jdbc/trident/state/JdbcState.java | 13 +- .../storm/jdbc/common/JdbcClientTest.java | 7 +++--- .../jdbc/topology/AbstractUserTopology.java | 16 ++-- .../jdbc/topology/UserPersistanceTopology.java | 4 +-- .../UserPersistanceTridentTopology.java | 2 +- 14 files changed, 74 insertions(+), 78 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/d268903f/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java -- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java index d1f5758..15a2345 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java @@ -21,8 +21,7 @@ import backtype.storm.Config; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.base.BaseRichBolt; -import org.apache.commons.lang.Validate; -import org.apache.storm.jdbc.common.ConnectionPrvoider; +import org.apache.storm.jdbc.common.ConnectionProvider; import org.apache.storm.jdbc.common.JdbcClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,27 +36,27 @@ public abstract class AbstractJdbcBolt extends BaseRichBolt { protected transient JdbcClient jdbcClient; protected String configKey; protected Integer queryTimeoutSecs; -protected ConnectionPrvoider connectionPrvoider; +protected ConnectionProvider connectionProvider; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) { this.collector = collector; -connectionPrvoider.prepare(); +connectionProvider.prepare(); if(queryTimeoutSecs == null) { queryTimeoutSecs = Integer.parseInt(map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).toString()); } -this.jdbcClient = new JdbcClient(connectionPrvoider, queryTimeoutSecs); +this.jdbcClient = new JdbcClient(connectionProvider, queryTimeoutSecs); } -public AbstractJdbcBolt(ConnectionPrvoider connectionPrvoider) { -this.connectionPrvoider = connectionPrvoider; +public AbstractJdbcBolt(ConnectionProvider connectionProvider) { +this.connectionProvider = connectionProvider; } @Override public void cleanup() { -connectionPrvoider.cleanup(); +connectionProvider.cleanup(); } } http://git-wip-us.apache.org/repos/asf/storm/blob/d268903f/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java -- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java index 0dc7f26..2f29000 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java @@ -22,7 +22,7 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; import org.apache.commons.lang3.StringUtils; import org.apache.storm.jdbc.common.Column; -import org.apache.storm.jdbc.common.ConnectionPrvoider; +import org.apache.storm.jdbc.common.ConnectionProvider; import
[9/9] storm git commit: add STORM-821 to changelog
add STORM-821 to changelog Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/511869c6 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/511869c6 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/511869c6 Branch: refs/heads/0.10.x-branch Commit: 511869c6f2eac2940941c735eb8f1559b143ca30 Parents: caaefe6 Author: P. Taylor Goetz ptgo...@gmail.com Authored: Fri May 29 15:44:25 2015 -0400 Committer: P. Taylor Goetz ptgo...@gmail.com Committed: Fri May 29 15:44:25 2015 -0400 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/511869c6/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index b0b8d0e..fc49984 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## 0.10.0 + * STORM-821: Adding connection provider interface to decouple jdbc connector from a single connection pooling implementation. * STORM-737] Check task-node+port with read lock to prevent sending to closed connection * STORM-835 Netty Client hold batch object until io operation complete * STORM-827: Allow AutoTGT to work with storm-hdfs too.
[2/9] storm git commit: Merge branch '0.10.x-branch' of https://github.com/apache/incubator-storm into STORM-821
Merge branch '0.10.x-branch' of https://github.com/apache/incubator-storm into STORM-821 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f628bb49 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f628bb49 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f628bb49 Branch: refs/heads/0.10.x-branch Commit: f628bb4977cd908f8b85eca553604011248ac26f Parents: 7771a4d 90c5994 Author: Parth Brahmbhatt brahmbhatt.pa...@gmail.com Authored: Tue May 26 15:41:14 2015 -0700 Committer: Parth Brahmbhatt brahmbhatt.pa...@gmail.com Committed: Tue May 26 15:41:14 2015 -0700 -- CHANGELOG.md| 1 + external/storm-redis/README.md | 4 ++-- .../storm/redis/trident/state/RedisClusterStateUpdater.java | 9 +++-- .../apache/storm/redis/trident/state/RedisStateUpdater.java | 9 +++-- .../apache/storm/redis/trident/WordCountTridentRedis.java | 2 +- .../storm/redis/trident/WordCountTridentRedisCluster.java | 2 +- storm-dist/binary/src/main/assembly/binary.xml | 2 +- 7 files changed, 20 insertions(+), 9 deletions(-) --
[1/3] storm git commit: sync storm-redis with master
Repository: storm Updated Branches: refs/heads/0.10.x-branch 511869c6f - 81505f9c7 http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java -- diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java index 535d7b9..77c6ee8 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java @@ -23,10 +23,14 @@ import backtype.storm.StormSubmitter; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; +import backtype.storm.tuple.ITuple; import backtype.storm.tuple.Tuple; import org.apache.storm.redis.bolt.AbstractRedisBolt; -import org.apache.storm.redis.util.config.JedisClusterConfig; -import org.apache.storm.redis.util.config.JedisPoolConfig; +import org.apache.storm.redis.bolt.RedisStoreBolt; +import org.apache.storm.redis.common.config.JedisClusterConfig; +import org.apache.storm.redis.common.config.JedisPoolConfig; +import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; +import org.apache.storm.redis.common.mapper.RedisStoreMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.JedisCommands; @@ -36,46 +40,11 @@ import redis.clients.jedis.exceptions.JedisException; public class PersistentWordCount { private static final String WORD_SPOUT = WORD_SPOUT; private static final String COUNT_BOLT = COUNT_BOLT; -private static final String REDIS_BOLT = REDIS_BOLT; +private static final String STORE_BOLT = STORE_BOLT; private static final String TEST_REDIS_HOST = 127.0.0.1; private static final int TEST_REDIS_PORT = 6379; -public static class StoreCountRedisBolt extends AbstractRedisBolt { -private static final Logger LOG = LoggerFactory.getLogger(StoreCountRedisBolt.class); - -public StoreCountRedisBolt(JedisPoolConfig config) { -super(config); -} - -public StoreCountRedisBolt(JedisClusterConfig config) { -super(config); -} - -@Override -public void execute(Tuple input) { -String word = input.getStringByField(word); -int count = input.getIntegerByField(count); - -JedisCommands commands = null; -try { -commands = getInstance(); -commands.incrBy(word, count); -} catch (JedisConnectionException e) { -throw new RuntimeException(Unfortunately, this test requires redis-server running, e); -} catch (JedisException e) { -LOG.error(Exception occurred from Jedis/Redis, e); -} finally { -returnInstance(commands); -this.collector.ack(input); -} -} - -@Override -public void declareOutputFields(OutputFieldsDeclarer declarer) { -} -} - public static void main(String[] args) throws Exception { Config config = new Config(); @@ -92,14 +61,15 @@ public class PersistentWordCount { WordSpout spout = new WordSpout(); WordCounter bolt = new WordCounter(); -StoreCountRedisBolt redisBolt = new StoreCountRedisBolt(poolConfig); +RedisStoreMapper storeMapper = setupStoreMapper(); +RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper); // wordSpout == countBolt == RedisBolt TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(WORD_SPOUT, spout, 1); -builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT); -builder.setBolt(REDIS_BOLT, redisBolt, 1).fieldsGrouping(COUNT_BOLT, new Fields(word)); +builder.setBolt(COUNT_BOLT, bolt, 1).fieldsGrouping(WORD_SPOUT, new Fields(word)); +builder.setBolt(STORE_BOLT, storeBolt, 1).shuffleGrouping(COUNT_BOLT); if (args.length == 2) { LocalCluster cluster = new LocalCluster(); @@ -114,4 +84,33 @@ public class PersistentWordCount { System.out.println(Usage: PersistentWordCount redis host redis port (topology name)); } } + +private static RedisStoreMapper setupStoreMapper() { +return new WordCountStoreMapper(); +} + +private static class WordCountStoreMapper implements RedisStoreMapper { +private RedisDataTypeDescription description; +private final String hashKey = wordCount; + +public WordCountStoreMapper() { +description = new RedisDataTypeDescription( +RedisDataTypeDescription.RedisDataType.HASH,
Git Push Summary
Repository: storm Updated Tags: refs/tags/v0.9.5 [deleted] fdab9194e