[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-05-26 Thread Crystark
Github user Crystark commented on the pull request:

https://github.com/apache/storm/pull/374#issuecomment-105472684
  
On more thing i noticed: 
[Column.hashCode](https://github.com/apache/storm/blob/master/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java#L99)
 doesn't support the value beeing null. I've tried using those columns as keys 
of a HashMap and got a NPE.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-05-26 Thread Parth-Brahmbhatt
Github user Parth-Brahmbhatt commented on the pull request:

https://github.com/apache/storm/pull/374#issuecomment-105598508
  
@Crystark  You are right I will fix it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-24 Thread revans2
Github user revans2 commented on the pull request:

https://github.com/apache/storm/pull/374#issuecomment-75847524
  
I am now getting a compilation error.
``` [ERROR] 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Util.java:[30,29]
 incomparable types: int and java.lang.Object```  Not sure if this is JDK8 
specific or not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-24 Thread revans2
Github user revans2 commented on the pull request:

https://github.com/apache/storm/pull/374#issuecomment-75848428
  
Yes it looks like it is a JDK8 issue.  JDK6 compiles the code just fine. 
I'll file a new JIRA for it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-23 Thread harshach
Github user harshach commented on the pull request:

https://github.com/apache/storm/pull/374#issuecomment-75650396
  
Thanks @Parth-Brahmbhatt  merged into master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/374


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-19 Thread Crystark
Github user Crystark commented on the pull request:

https://github.com/apache/storm/pull/374#issuecomment-75028911
  
Works like a charm :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-19 Thread harshach
Github user harshach commented on the pull request:

https://github.com/apache/storm/pull/374#issuecomment-75185107
  
+1.  @Parth-Brahmbhatt  Thanks for the quick fix . @Crystark  Thanks for 
review and nice catch.
@revans2  @ptgoetz  I think this is ready to merge it in will give it a day 
or two before pushing it to master. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-18 Thread Parth-Brahmbhatt
Github user Parth-Brahmbhatt commented on the pull request:

https://github.com/apache/storm/pull/374#issuecomment-74940637
  
@Crystark  super sorry about this I am not sure how the heck did I miss 
this. The code you pointed at is fine but I should be using executeBatch 
instead of executeUpdate. I have modified the code to use executeBatch and also 
modified the test to test the bug you pointed out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-18 Thread Crystark
Github user Crystark commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r24902452
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java 
---
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.common;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.sql.Date;
+import java.util.*;
+
+public class JdbcClient {
+private static final Logger LOG = 
LoggerFactory.getLogger(JdbcClient.class);
+
+private HikariDataSource dataSource;
+private int queryTimeoutSecs;
+
+public JdbcClient(MapString, Object map, int queryTimeoutSecs) {
+Properties properties = new Properties();
+properties.putAll(map);
+HikariConfig config = new HikariConfig(properties);
+this.dataSource = new HikariDataSource(config);
+this.queryTimeoutSecs = queryTimeoutSecs;
+}
+
+public int insert(String tableName, ListListColumn columnLists) {
+Connection connection = null;
+try {
+connection = this.dataSource.getConnection();
+StringBuilder sb = new StringBuilder();
+sb.append(Insert into ).append(tableName).append( ();
+CollectionString columnNames = 
Collections2.transform(columnLists.get(0), new FunctionColumn, String() {
+@Override
+public String apply(Column input) {
+return input.getColumnName();
+}
+});
+String columns = Joiner.on(,).join(columnNames);
+sb.append(columns).append() values ( );
+
+String placeHolders = 
StringUtils.chop(StringUtils.repeat(?,, columnNames.size()));
+sb.append(placeHolders).append());
--- End diff --

The problem at this part is that there's only one set of values though this 
method works with multiple set of values.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-18 Thread Crystark
Github user Crystark commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r24902378
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java 
---
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.common;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.sql.Date;
+import java.util.*;
+
+public class JdbcClient {
+private static final Logger LOG = 
LoggerFactory.getLogger(JdbcClient.class);
+
+private HikariDataSource dataSource;
+private int queryTimeoutSecs;
+
+public JdbcClient(MapString, Object hikariConfigMap, int 
queryTimeoutSecs) {
+Properties properties = new Properties();
+properties.putAll(hikariConfigMap);
+HikariConfig config = new HikariConfig(properties);
+this.dataSource = new HikariDataSource(config);
+this.queryTimeoutSecs = queryTimeoutSecs;
+}
+
+public int insert(String tableName, ListListColumn columnLists) {
+Connection connection = null;
+try {
+connection = this.dataSource.getConnection();
+StringBuilder sb = new StringBuilder();
+sb.append(Insert into ).append(tableName).append( ();
+CollectionString columnNames = 
Collections2.transform(columnLists.get(0), new FunctionColumn, String() {
+@Override
+public String apply(Column input) {
+return input.getColumnName();
+}
+});
+String columns = Joiner.on(,).join(columnNames);
+sb.append(columns).append() values ( );
+
+String placeHolders = 
StringUtils.chop(StringUtils.repeat(?,, columnNames.size()));
+sb.append(placeHolders).append());
+
+String query = sb.toString();
+
+LOG.debug(Executing query {}, query);
+
+
+PreparedStatement preparedStatement = 
connection.prepareStatement(query);
+preparedStatement.setQueryTimeout(queryTimeoutSecs);
+for(ListColumn columnList : columnLists) {
+setPreparedStatementParams(preparedStatement, columnList);
+}
--- End diff --

Here each `columnList ` of `columnLists` overrides the previous value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-10 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r24434272
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java 
---
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.common;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.sql.Date;
+import java.util.*;
+
+public class JdbcClient {
+private static final Logger LOG = 
LoggerFactory.getLogger(JdbcClient.class);
+
+private HikariDataSource dataSource;
+private int queryTimeoutSecs;
+
+public JdbcClient(MapString, Object map, int queryTimeoutSecs) {
+Properties properties = new Properties();
+properties.putAll(map);
+HikariConfig config = new HikariConfig(properties);
+this.dataSource = new HikariDataSource(config);
+this.queryTimeoutSecs = queryTimeoutSecs;
+}
+
+public int insert(String tableName, ListListColumn columnLists) {
+Connection connection = null;
+try {
+connection = this.dataSource.getConnection();
+StringBuilder sb = new StringBuilder();
+sb.append(Insert into ).append(tableName).append( ();
+CollectionString columnNames = 
Collections2.transform(columnLists.get(0), new FunctionColumn, String() {
+@Override
+public String apply(Column input) {
+return input.getColumnName();
+}
+});
+String columns = Joiner.on(,).join(columnNames);
+sb.append(columns).append() values ( );
+
+String placeHolders = 
StringUtils.chop(StringUtils.repeat(?,, columnNames.size()));
+sb.append(placeHolders).append());
--- End diff --

I am not a DB expert I read up on JDBC some more and talked to 
@kishorvpatil I think the current code is going to be fine.  I am a little 
concerned about the overhead of doing string manipulations to recreate each 
prepared statement, but I am OK with waiting for it to become a problem before 
we implement some sort of caching.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-10 Thread Parth-Brahmbhatt
Github user Parth-Brahmbhatt commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r24461061
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
 ---
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.bolt;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Basic bolt for querying from any database.
+ */
+public class JdbcLookupBolt extends AbstractJdbcBolt {
+private static final Logger LOG = 
LoggerFactory.getLogger(JdbcLookupBolt.class);
+
+private String selectQuery;
+
+private JdbcLookupMapper jdbcLookupMapper;
+
+public JdbcLookupBolt(String configKey) {
+super(configKey);
+}
+
+public JdbcLookupBolt withJdbcLookupMapper(JdbcLookupMapper 
jdbcLookupMapper) {
+this.jdbcLookupMapper = jdbcLookupMapper;
+return this;
+}
+
+public JdbcLookupBolt withSelectSql(String selectQuery) {
+this.selectQuery = selectQuery;
+return this;
+}
+
+public JdbcLookupBolt withQueryTimeoutSecs(int queryTimeoutSecs) {
+this.queryTimeoutSecs = queryTimeoutSecs;
+return this;
+}
+
+@Override
+public void execute(Tuple tuple) {
+try {
+ListColumn columns = jdbcLookupMapper.getColumns(tuple);
+ListListColumn result = 
jdbcClient.select(this.selectQuery, columns);
--- End diff --

This is done, added all required param as part of constructors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-03 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r24043926
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.bolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Basic bolt for writing to any Database table.
+ * p/
+ * Note: Each JdbcBolt defined in a topology is tied to a specific table.
+ */
+public class JdbcBolt extends AbstractJdbcBolt {
+private static final Logger LOG = 
LoggerFactory.getLogger(JdbcBolt.class);
+
+private String tableName;
+private JdbcMapper jdbcMapper;
+
+public JdbcBolt withConfigKey(String configKey) {
--- End diff --

OK So I see two ways to do the Hikari configuration.
The current way.
```
MapString, String jdbcConf = new HashMapString, String();
jdbcConf.put(a,b);
stormConf.put(something, jdbcConf);
bolt.withConfigKey(something);
```

or we could just pass the jdbc config to the Bolt, and not send it through 
the topology config first.

```
MapString, String jdbcConf = new HashMapString, String();
jdbcConf.put(a,b);
bolt.withConfig(jdbcConf);
``` 
I don't understand why we need the extra step of storing a config inside 
another config, and providing a layer of indirection.

As for constructor values vs builder values, it is a question of required 
vs optional parameters.

```
MapString, String jdbcConf = new HashMapString, String();
jdbcConf.put(a,b);
stormConf.put(something, jdbcConf);
//oops forgot this bolt.withConfigKey(something);
```
results in an NPE after the topology is launched.  If it were a constructor 
parameter.
```
MapString, String jdbcConf = new HashMapString, String();
jdbcConf.put(a,b);
stormConf.put(something, jdbcConf);
new Bolt(/*forgot something*/);
```
is a compile time error.  I am fine with having a builder pattern for 
anything that is optional, or even a way to override/extend require values.  I 
see it as defensive programming.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-03 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r24044371
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java ---
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.common;
+
+
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+
+public class ColumnT implements Serializable {
+
+private String columnName;
+private T val;
+private int sqlType;
+
+public Column(String columnName, T val, int sqlType) {
--- End diff --

The javadocs is what I really was interested in.  The markdown doc was 
really great, but an int for sqlType is really confusing without some 
documentation right there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-03 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r24015372
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java ---
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.common;
+
+
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
--- End diff --

Most, if not all of these imports do not seem to be used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-03 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r24014941
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.bolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Basic bolt for writing to any Database table.
+ * p/
+ * Note: Each JdbcBolt defined in a topology is tied to a specific table.
+ */
+public class JdbcBolt extends AbstractJdbcBolt {
--- End diff --

I'm not sure I like this name.  I would rather have it called something 
like JdbcInsertBolt.  Just because Jdbc does not by itself indicate that it 
will insert anything into the database.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-03 Thread revans2
Github user revans2 commented on the pull request:

https://github.com/apache/storm/pull/374#issuecomment-72685012
  
For the most part it looks OK.  I am concerned about a few of the APIs seem 
confusing to me and could use some better javadocs.  Also I am concerned with 
the performance of some of the sql involved.  We are regenerating a lot of the 
sql from text for each query.  I am not a sql expert, but I thought that the 
idea behind prepared statements was that it made it so the sql engine did not 
have to reparse and compile the query each time.  I'm not sure how each server 
does that internally, but the fact that we are doing string manipulation for 
each query instead of once when the bolt is prepared, seems problematic.

Also the Columns magic we are doing seems OK, but like I said I am not a 
SQL expert and it feels like just using a stored query would be a lot simpler 
then all of this. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-03 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r24014635
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.bolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Basic bolt for writing to any Database table.
+ * p/
+ * Note: Each JdbcBolt defined in a topology is tied to a specific table.
+ */
+public class JdbcBolt extends AbstractJdbcBolt {
+private static final Logger LOG = 
LoggerFactory.getLogger(JdbcBolt.class);
+
+private String tableName;
+private JdbcMapper jdbcMapper;
+
+public JdbcBolt withConfigKey(String configKey) {
--- End diff --

If this is required I would rather see it be a part of a Constructor rather 
than a builder method.  This should be similar for the others.  Builder methods 
are great for optional values, that have a decent default, but if something is 
required it should be in the constructor.  And why do we want a config key?  
The JDBC config needs to be serializable to both json and yaml to be in the 
storm config already, it feels like we could just pass the JDBC config into the 
bolt instead, unless you feel it really needs to be in the config for debug 
purposes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-03 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r24014770
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.bolt;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Basic bolt for writing to any Database table.
+ * p/
+ * Note: Each JdbcBolt defined in a topology is tied to a specific table.
+ */
+public class JdbcBolt extends AbstractJdbcBolt {
+private static final Logger LOG = 
LoggerFactory.getLogger(JdbcBolt.class);
+
+private String tableName;
+private JdbcMapper jdbcMapper;
+
+public JdbcBolt(String configKey) {
+super(configKey);
+}
+
+public JdbcBolt withTableName(String tableName) {
+this.tableName = tableName;
+return this;
+}
+
+public JdbcBolt withJdbcMapper(JdbcMapper jdbcMapper) {
+this.jdbcMapper = jdbcMapper;
+return this;
+}
+
+@Override
+public void execute(Tuple tuple) {
+try {
+ListColumn columns = jdbcMapper.getColumns(tuple);
+ListListColumn columnLists = new ArrayListListColumn();
+columnLists.add(columns);
+this.jdbcClient.insert(this.tableName, columnLists);
+} catch (Exception e) {
+LOG.warn(Failing tuple., e);
+this.collector.fail(tuple);
--- End diff --

I agree with reportError before the fail.  This allows the error to show up 
in the UI and be somewhere besides the logs.  It will also log it for you, so 
you don't need to log it above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-03 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r24015210
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
 ---
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.bolt;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Basic bolt for querying from any database.
+ */
+public class JdbcLookupBolt extends AbstractJdbcBolt {
+private static final Logger LOG = 
LoggerFactory.getLogger(JdbcLookupBolt.class);
+
+private String selectQuery;
+
+private JdbcLookupMapper jdbcLookupMapper;
+
+public JdbcLookupBolt withConfigKey(String configKey) {
+this.configKey = configKey;
+return this;
+}
+
+public JdbcLookupBolt withJdbcLookupMapper(JdbcLookupMapper 
jdbcLookupMapper) {
+this.jdbcLookupMapper = jdbcLookupMapper;
+return this;
+}
+
+public JdbcLookupBolt withSelectSql(String selectQuery) {
+this.selectQuery = selectQuery;
+return this;
+}
+
+public JdbcLookupBolt withQueryTimeoutSecs(int queryTimeoutSecs) {
+this.queryTimeoutSecs = queryTimeoutSecs;
+return this;
+}
+
+@Override
+public void execute(Tuple tuple) {
+try {
+ListColumn columns = jdbcLookupMapper.getColumns(tuple);
+ListListColumn result = 
jdbcClient.select(this.selectQuery, columns);
+
+if (result != null  result.size() != 0) {
+for (ListColumn row : result) {
+ListValues values = jdbcLookupMapper.toTuple(tuple, 
row);
+for (Values value : values) {
+collector.emit(value);
--- End diff --

Please anchor these emits to the incoming tuple.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-03 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r24016216
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java 
---
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.common;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.sql.Date;
+import java.util.*;
+
+public class JdbcClient {
+private static final Logger LOG = 
LoggerFactory.getLogger(JdbcClient.class);
+
+private HikariDataSource dataSource;
+private int queryTimeoutSecs;
+
+public JdbcClient(MapString, Object map, int queryTimeoutSecs) {
+Properties properties = new Properties();
+properties.putAll(map);
+HikariConfig config = new HikariConfig(properties);
+this.dataSource = new HikariDataSource(config);
+this.queryTimeoutSecs = queryTimeoutSecs;
+}
+
+public int insert(String tableName, ListListColumn columnLists) {
+Connection connection = null;
+try {
+connection = this.dataSource.getConnection();
+StringBuilder sb = new StringBuilder();
+sb.append(Insert into ).append(tableName).append( ();
+CollectionString columnNames = 
Collections2.transform(columnLists.get(0), new FunctionColumn, String() {
+@Override
+public String apply(Column input) {
+return input.getColumnName();
+}
+});
+String columns = Joiner.on(,).join(columnNames);
+sb.append(columns).append() values ( );
+
+String placeHolders = 
StringUtils.chop(StringUtils.repeat(?,, columnNames.size()));
+sb.append(placeHolders).append());
--- End diff --

Building the query through string manipulation each time seems like a real 
waste to me.  Why are we not using JDBC stored queries instead or in addition 
to this?  Also even though SQL is a standard there are always inconsistencies 
between the different databases.  It feels like we are reinventing the wheel 
here and missing some things.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-03 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r24037306
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.bolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Basic bolt for writing to any Database table.
+ * p/
+ * Note: Each JdbcBolt defined in a topology is tied to a specific table.
+ */
+public class JdbcBolt extends AbstractJdbcBolt {
+private static final Logger LOG = 
LoggerFactory.getLogger(JdbcBolt.class);
+
+private String tableName;
+private JdbcMapper jdbcMapper;
+
+public JdbcBolt withConfigKey(String configKey) {
--- End diff --

I think the idea behind config key here, like in other modules, is to allow 
multiple instances of the bolt use different configurations (e.g. pointing to 
different databases, schemas, etc.).

That being said, I'm not really partial to using either a builder method or 
constructor arg, as long as it's exposed as an option. Either way, I'd like to 
try to keep it consistent across modules so the APIs have a similar feel. One 
option for the builder method would be to define a default config key value 
that gets used unless overridden with `withConfigKey()`. We should also clearly 
document this behavior.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-03 Thread Parth-Brahmbhatt
Github user Parth-Brahmbhatt commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r24039560
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
 ---
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.bolt;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Basic bolt for querying from any database.
+ */
+public class JdbcLookupBolt extends AbstractJdbcBolt {
+private static final Logger LOG = 
LoggerFactory.getLogger(JdbcLookupBolt.class);
+
+private String selectQuery;
+
+private JdbcLookupMapper jdbcLookupMapper;
+
+public JdbcLookupBolt withConfigKey(String configKey) {
+this.configKey = configKey;
+return this;
+}
+
+public JdbcLookupBolt withJdbcLookupMapper(JdbcLookupMapper 
jdbcLookupMapper) {
+this.jdbcLookupMapper = jdbcLookupMapper;
+return this;
+}
+
+public JdbcLookupBolt withSelectSql(String selectQuery) {
+this.selectQuery = selectQuery;
+return this;
+}
+
+public JdbcLookupBolt withQueryTimeoutSecs(int queryTimeoutSecs) {
+this.queryTimeoutSecs = queryTimeoutSecs;
+return this;
+}
+
+@Override
+public void execute(Tuple tuple) {
+try {
+ListColumn columns = jdbcLookupMapper.getColumns(tuple);
+ListListColumn result = 
jdbcClient.select(this.selectQuery, columns);
+
+if (result != null  result.size() != 0) {
+for (ListColumn row : result) {
+ListValues values = jdbcLookupMapper.toTuple(tuple, 
row);
+for (Values value : values) {
+collector.emit(value);
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-03 Thread Parth-Brahmbhatt
Github user Parth-Brahmbhatt commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r24039215
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.bolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Basic bolt for writing to any Database table.
+ * p/
+ * Note: Each JdbcBolt defined in a topology is tied to a specific table.
+ */
+public class JdbcBolt extends AbstractJdbcBolt {
+private static final Logger LOG = 
LoggerFactory.getLogger(JdbcBolt.class);
+
+private String tableName;
+private JdbcMapper jdbcMapper;
+
+public JdbcBolt withConfigKey(String configKey) {
--- End diff --

Moved back to constructor. I am not sure what you mean when you say we 
could just pass the config into the Bolt. Hikari has many configuration 
https://github.com/brettwooldridge/HikariCP. The configKey is so user can 
provide any subset of those configs. so the configKey is just name of the 
config key in your submitted storm-conf, the value of this should be a map of 
hikari config keys to their corresponding values.

I can extract a connectionProvider interface and create a Hikari 
implementation that users have to construct to initialize the Bolts. This will 
mean the bolts will not have a config Key, users will be able to use whatever 
connection pool they feel most comfortable with and the configuration details 
will live in the connection provider. Do you think that will be better than 
what we currently have.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-03 Thread Parth-Brahmbhatt
Github user Parth-Brahmbhatt commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r24039240
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.bolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Basic bolt for writing to any Database table.
+ * p/
+ * Note: Each JdbcBolt defined in a topology is tied to a specific table.
+ */
+public class JdbcBolt extends AbstractJdbcBolt {
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-03 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r24037432
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.bolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Basic bolt for writing to any Database table.
+ * p/
+ * Note: Each JdbcBolt defined in a topology is tied to a specific table.
+ */
+public class JdbcBolt extends AbstractJdbcBolt {
--- End diff --

I agree with @revans2. While this deviates from some of the other older 
modules, I think this is a good convention to follow moving forward.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-03 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r24039341
  
--- Diff: external/storm-jdbc/pom.xml ---
@@ -0,0 +1,125 @@
+?xml version=1.0 encoding=UTF-8?
+!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the License); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an AS IS BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+--
+project xmlns=http://maven.apache.org/POM/4.0.0; 
xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
+modelVersion4.0.0/modelVersion
+
+parent
+artifactIdstorm/artifactId
+groupIdorg.apache.storm/groupId
+version0.10.0-SNAPSHOT/version
+relativePath../../pom.xml/relativePath
+/parent
+
+artifactIdstorm-jdbc/artifactId
+
+developers
+developer
+idParth-Brahmbhatt/id
+nameParth Brahmbhatt/name
+emailbrahmbhatt.pa...@gmail.com/email
+/developer
+/developers
+
+properties
+hikari.version2.2.5/hikari.version
+/properties
+
+dependencies
+dependency
+groupIdorg.apache.storm/groupId
+artifactIdstorm-core/artifactId
+version${project.version}/version
+scopeprovided/scope
+/dependency
+dependency
+groupIdorg.apache.commons/groupId
+artifactIdcommons-lang3/artifactId
+version3.3/version
+/dependency
+dependency
+groupIdcom.google.guava/groupId
+artifactIdguava/artifactId
+version17.0/version
--- End diff --

Is there a way we can use the guava dependency already included with Storm 
(I would understand if not, just double-checking).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-03 Thread Parth-Brahmbhatt
Github user Parth-Brahmbhatt commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r24040425
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java 
---
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.common;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.sql.Date;
+import java.util.*;
+
+public class JdbcClient {
+private static final Logger LOG = 
LoggerFactory.getLogger(JdbcClient.class);
+
+private HikariDataSource dataSource;
+private int queryTimeoutSecs;
+
+public JdbcClient(MapString, Object map, int queryTimeoutSecs) {
+Properties properties = new Properties();
+properties.putAll(map);
+HikariConfig config = new HikariConfig(properties);
+this.dataSource = new HikariDataSource(config);
+this.queryTimeoutSecs = queryTimeoutSecs;
+}
+
+public int insert(String tableName, ListListColumn columnLists) {
+Connection connection = null;
+try {
+connection = this.dataSource.getConnection();
+StringBuilder sb = new StringBuilder();
+sb.append(Insert into ).append(tableName).append( ();
+CollectionString columnNames = 
Collections2.transform(columnLists.get(0), new FunctionColumn, String() {
+@Override
+public String apply(Column input) {
+return input.getColumnName();
+}
+});
+String columns = Joiner.on(,).join(columnNames);
+sb.append(columns).append() values ( );
+
+String placeHolders = 
StringUtils.chop(StringUtils.repeat(?,, columnNames.size()));
+sb.append(placeHolders).append());
+
+String query = sb.toString();
+if(LOG.isDebugEnabled()) {
+LOG.debug(Executing query  + query);
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-03 Thread Parth-Brahmbhatt
Github user Parth-Brahmbhatt commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r24040287
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java ---
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.common;
+
+
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+
+public class ColumnT implements Serializable {
+
+private String columnName;
+private T val;
+private int sqlType;
+
+public Column(String columnName, T val, int sqlType) {
--- End diff --

I have added a javadoc that should help to clarify things. If we consider a 
database table as a list of rows and each row as a list of columns, this class 
represent a single instance of a column, so for a row [userID =1 , userName = 
Foo] we will have two instances of Column class
new Column(userId, 1, Types.INTEGER) and new Column(userName, Foo, 
Types.Varchar). 

I am not sure why java's jdbc API decided to go with integer to represent 
sql Types instead of using more readable Enums but the sqlType has value mapped 
to java.sql.Types where each constant represents a database type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-02-03 Thread Parth-Brahmbhatt
Github user Parth-Brahmbhatt commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r24041005
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java 
---
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.common;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.sql.Date;
+import java.util.*;
+
+public class JdbcClient {
+private static final Logger LOG = 
LoggerFactory.getLogger(JdbcClient.class);
+
+private HikariDataSource dataSource;
+private int queryTimeoutSecs;
+
+public JdbcClient(MapString, Object map, int queryTimeoutSecs) {
+Properties properties = new Properties();
+properties.putAll(map);
+HikariConfig config = new HikariConfig(properties);
+this.dataSource = new HikariDataSource(config);
+this.queryTimeoutSecs = queryTimeoutSecs;
+}
+
+public int insert(String tableName, ListListColumn columnLists) {
+Connection connection = null;
+try {
+connection = this.dataSource.getConnection();
+StringBuilder sb = new StringBuilder();
+sb.append(Insert into ).append(tableName).append( ();
+CollectionString columnNames = 
Collections2.transform(columnLists.get(0), new FunctionColumn, String() {
+@Override
+public String apply(Column input) {
+return input.getColumnName();
+}
+});
+String columns = Joiner.on(,).join(columnNames);
+sb.append(columns).append() values ( );
+
+String placeHolders = 
StringUtils.chop(StringUtils.repeat(?,, columnNames.size()));
+sb.append(placeHolders).append());
--- End diff --

We are using prepared statements with place holders so we are leveraging 
the statement cache., not sure if you mean something else by JDBC stored 
queries. The statement cache is generally on server side and as long as we are 
using preparedstatements with place holders ? we should get the benefit of 
not having to compile/parse the sql and the cached execution plan.  

The only reason we construct the sql query each time is because this is 
more flexible then a single static sql. For inserts people can have columns 
with default values and their topology can chose to emit values for those 
columns sometimes and sometimes they can just ignore those columns and both 
scenarios will be supported.

I can provide an override sql that user can provide , personally I would 
rather wait till we actually come across a DB or user that runs into some issue 
before adding that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-01-26 Thread harshach
Github user harshach commented on the pull request:

https://github.com/apache/storm/pull/374#issuecomment-71579088
  
@revans2 @ptgoetz can you take a look at this PR. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-01-21 Thread Parth-Brahmbhatt
Github user Parth-Brahmbhatt commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r23339923
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java 
---
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.common;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.sql.Date;
+import java.util.*;
+
+public class JdbcClient {
--- End diff --

@itaifrenkel fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-01-20 Thread itaifrenkel
Github user itaifrenkel commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r23282556
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java 
---
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.common;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.sql.Date;
+import java.util.*;
+
+public class JdbcClient {
--- End diff --

The name of the file has different casing than the name of the class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-01-14 Thread Parth-Brahmbhatt
Github user Parth-Brahmbhatt commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r22962128
  
--- Diff: external/storm-jdbc/README.md ---
@@ -0,0 +1,208 @@
+#Storm JDBC
+Storm/Trident integration for JDBC. This package includes the core bolts 
and trident states that allows a storm topology
+to either insert storm tuples in a database table or to execute select 
queries against a database and enrich tuples 
+in a storm topology. This code uses HikariCP for connection pooling. See 
http://brettwooldridge.github.io/HikariCP.
+
+## Inserting into a database.
+The bolt and trindet state included in this package for inserting data 
into a database tables are tied to a single table.
+The main API for inserting data in a table using JDBC is the 
`org.apache.storm.jdbc.mapper.JdbcMapper` interface:
+
+```java
+public interface JdbcMapper  extends Serializable {
+ListColumn getColumns(ITuple tuple);
+}
+```
+
+The `getColumns()` method defines how a storm tuple maps to a list of 
columns representing a row in a database.
+
+### SimpleJdbcMapper
+`storm-jdbc` includes a general purpose `JdbcMapper` implementation called 
`SimpleJdbcMapper` that can map Storm
+tuple to a Database row. `SimpleJdbcMapper` assumes that the storm tuple 
has fields with same name as the column name in 
+the database table that you intend to write to.
+
+To use `SimpleJdbcMapper`, you simply tell it the tableName that you want 
to write to and provide a hikari configuration map.
+
+The following code creates a `SimpleJdbcMapper` instance that:
+
+1. Will allow the mapper to transform a storm tuple to a list of columns 
mapping to a row in table test.user_details.
+2. Will use the provided HikariCP configuration to establish a connection 
pool with specified Database configuration and
+automatically figure out the column names and corresponding data types of 
the table that you intend to write to. 
+Please see 
https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby to learn 
more about hikari configuration properties.
+
+```java
+Map hikariConfigMap = Maps.newHashMap();

+hikariConfigMap.put(dataSourceClassName,com.mysql.jdbc.jdbc2.optional.MysqlDataSource);
+hikariConfigMap.put(dataSource.url, jdbc:mysql://localhost/test);
+hikariConfigMap.put(dataSource.user,root);
+hikariConfigMap.put(dataSource.password,password);
+String tableName = user_details;
+JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, map);
+```
+The mapper initialized in the example above assumes a storm tuple has 
value for all the columns. 
+If your storm tuple only has fields for a subset of columns i.e. if some 
of the columns in your table have default values 
+and you want to only insert values for columns with no default values you 
can enforce the behavior by initializing the 
+`SimpleJdbcMapper` with explicit columnschema. For example, if you have a 
user_details table 
+`create table if not exists user_details (user_id integer, user_name 
varchar(100), dept_name varchar(100), create_time TIMESTAMP DEFAULT 
CURRENT_TIMESTAMP);`
+In this table the create_time column has a default value. To ensure only 
the columns with no default values are inserted 
+you can initialize the `jdbcMapper` as below:
+
+```java
+ListColumn columnSchema = Lists.newArrayList(
+new Column(user_id, java.sql.Types.INTEGER),
+new Column(user_name, java.sql.Types.VARCHAR));
+JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
+```
+
+### JdbcBolt
+To use the `JdbcBolt`, construct it with configuration key in your storm 
config that hold the hikari configuration map.
+In addition you must specify the JdbcMapper implementation to covert storm 
tuple to DB row and the table name in which 
+the rows will be inserted.
+
+ ```java
+Config config = new Config();
+config.put(jdbc.conf, hikariConfigMap);
+JdbcBolt userPersistanceBolt = new JdbcBolt(jdbc.conf)
+.withTableName(user_details)
+.withJdbcMapper(simpleJdbcMapper);
+ ```
+### JdbcTridentState
+We also support a trident persistent state that can be used with trident 
topologies. To create a jdbc persistent trident
+state you need to initialize it with the table name, the JdbcMapper 
instance and name of storm config key that holds the
+hikari configuration map. See the example below:
+
+```java
+JdbcState.Options options = new JdbcState.Options()
+.withConfigKey(jdbc.conf)
+.withMapper(jdbcMapper)
+.withTableName(user_details);
+
+JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
+```
+
+## Lookup from 

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-01-14 Thread harshach
Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r22962179
  
--- Diff: 
external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
 ---
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.topology;
+
+import backtype.storm.Config;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.common.JDBCClient;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
+import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
+import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
+import org.apache.storm.jdbc.spout.UserSpout;
+import backtype.storm.LocalCluster;
+
+import java.sql.Types;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractUserTopology {
+private static final ListString setupSqls = Lists.newArrayList(
+create table if not exists user (user_id integer, user_name 
varchar(100), dept_name varchar(100), create_date date),
+create table if not exists department (dept_id integer, 
dept_name varchar(100)),
+create table if not exists user_department (user_id integer, 
dept_id integer),
+insert into department values (1, 'RD'),
+insert into department values (2, 'Finance'),
+insert into department values (3, 'HR'),
+insert into department values (4, 'Sales'),
+insert into user_department values (1, 1),
+insert into user_department values (2, 2),
+insert into user_department values (3, 3),
+insert into user_department values (4, 4)
+);
+protected UserSpout userSpout;
+protected JdbcMapper jdbcMapper;
+protected JdbcLookupMapper jdbcLookupMapper;
+
+protected static final String TABLE_NAME = user;
+protected static final String JDBC_CONF = jdbc.conf;
+protected static final String SELECT_QUERY = select dept_name from 
department, user_department where department.dept_id = user_department.dept_id 
+
+ and user_department.user_id = ?;
+
+public void execute(String[] args) throws Exception {
+if (args.length != 4  args.length != 5) {
+System.out.println(Usage:  + this.getClass().getSimpleName() 
+  dataSourceClassName dataSource.url 
++ user password [topology name]);
+System.exit(-1);
+}
+Map map = Maps.newHashMap();
+map.put(dataSourceClassName, 
args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource
+map.put(dataSource.url, args[1]);//jdbc:mysql://localhost/test
+map.put(dataSource.user, args[2]);//root
+map.put(dataSource.password, args[3]);//password
+
+Config config = new Config();
+config.put(JDBC_CONF, map);
+
+JDBCClient jdbcClient = new JDBCClient(map);
+for (String sql : setupSqls) {
+jdbcClient.executeSql(sql);
+}
+
+this.userSpout = new UserSpout();
+this.jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, map);
+Fields outputFields = new Fields(user_id, user_name, 
dept_name, create_date);
+ListColumn queryParamColumns = Lists.newArrayList(new 
Column(user_id, Types.INTEGER));
+this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, 
queryParamColumns);
+
+if (args.length == 4) {
+LocalCluster cluster = new LocalCluster();
+cluster.submitTopology(test, config, getTopology());
+Thread.sleep(3);
+

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-01-14 Thread itaifrenkel
Github user itaifrenkel commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r22972354
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
 ---
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.base.BaseRichBolt;
+import org.apache.commons.lang.Validate;
+import org.apache.storm.jdbc.common.JDBCClient;
--- End diff --

Notice that JDBCClient and JdbcMapper has different casing notations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-01-14 Thread harshach
Github user harshach commented on the pull request:

https://github.com/apache/storm/pull/374#issuecomment-69978982
  
overall I am +1 on merging this. There are minor nit-picks in the above 
comments. I would like to see the unit tests for the bolt and state. Probably 
we can use in-process jdbc db to test it out. We can do this as part of another 
JIRA.
@ptgoetz @revans2  please take a look at the PR. If we need a sponsor for 
jdbc I can volunteer. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-01-14 Thread Parth-Brahmbhatt
Github user Parth-Brahmbhatt commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r22962079
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.bolt;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
--- End diff --

fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-01-14 Thread Parth-Brahmbhatt
Github user Parth-Brahmbhatt commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r22962071
  
--- Diff: external/storm-jdbc/README.md ---
@@ -0,0 +1,208 @@
+#Storm JDBC
+Storm/Trident integration for JDBC. This package includes the core bolts 
and trident states that allows a storm topology
+to either insert storm tuples in a database table or to execute select 
queries against a database and enrich tuples 
+in a storm topology. This code uses HikariCP for connection pooling. See 
http://brettwooldridge.github.io/HikariCP.
+
+## Inserting into a database.
+The bolt and trindet state included in this package for inserting data 
into a database tables are tied to a single table.
--- End diff --

fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-01-14 Thread Parth-Brahmbhatt
Github user Parth-Brahmbhatt commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r22962868
  
--- Diff: 
external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
 ---
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.topology;
+
+import backtype.storm.Config;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.common.JDBCClient;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
+import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
+import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
+import org.apache.storm.jdbc.spout.UserSpout;
+import backtype.storm.LocalCluster;
+
+import java.sql.Types;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractUserTopology {
+private static final ListString setupSqls = Lists.newArrayList(
+create table if not exists user (user_id integer, user_name 
varchar(100), dept_name varchar(100), create_date date),
+create table if not exists department (dept_id integer, 
dept_name varchar(100)),
+create table if not exists user_department (user_id integer, 
dept_id integer),
+insert into department values (1, 'RD'),
+insert into department values (2, 'Finance'),
+insert into department values (3, 'HR'),
+insert into department values (4, 'Sales'),
+insert into user_department values (1, 1),
+insert into user_department values (2, 2),
+insert into user_department values (3, 3),
+insert into user_department values (4, 4)
+);
+protected UserSpout userSpout;
+protected JdbcMapper jdbcMapper;
+protected JdbcLookupMapper jdbcLookupMapper;
+
+protected static final String TABLE_NAME = user;
+protected static final String JDBC_CONF = jdbc.conf;
+protected static final String SELECT_QUERY = select dept_name from 
department, user_department where department.dept_id = user_department.dept_id 
+
+ and user_department.user_id = ?;
+
+public void execute(String[] args) throws Exception {
+if (args.length != 4  args.length != 5) {
+System.out.println(Usage:  + this.getClass().getSimpleName() 
+  dataSourceClassName dataSource.url 
++ user password [topology name]);
+System.exit(-1);
+}
+Map map = Maps.newHashMap();
+map.put(dataSourceClassName, 
args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource
+map.put(dataSource.url, args[1]);//jdbc:mysql://localhost/test
+map.put(dataSource.user, args[2]);//root
+map.put(dataSource.password, args[3]);//password
+
+Config config = new Config();
+config.put(JDBC_CONF, map);
+
+JDBCClient jdbcClient = new JDBCClient(map);
+for (String sql : setupSqls) {
+jdbcClient.executeSql(sql);
+}
+
+this.userSpout = new UserSpout();
+this.jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, map);
+Fields outputFields = new Fields(user_id, user_name, 
dept_name, create_date);
+ListColumn queryParamColumns = Lists.newArrayList(new 
Column(user_id, Types.INTEGER));
+this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, 
queryParamColumns);
+
+if (args.length == 4) {
+LocalCluster cluster = new LocalCluster();
+cluster.submitTopology(test, config, getTopology());
+Thread.sleep(3);
+

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-01-14 Thread Parth-Brahmbhatt
Github user Parth-Brahmbhatt commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r22974954
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.bolt;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Basic bolt for writing to any Database table.
+ * p/
+ * Note: Each JdbcBolt defined in a topology is tied to a specific table.
+ */
+public class JdbcBolt extends AbstractJdbcBolt {
+private static final Logger LOG = 
LoggerFactory.getLogger(JdbcBolt.class);
+
+private String tableName;
+private JdbcMapper jdbcMapper;
+
+public JdbcBolt(String configKey) {
+super(configKey);
+}
+
+public JdbcBolt withTableName(String tableName) {
+this.tableName = tableName;
+return this;
+}
+
+public JdbcBolt withJdbcMapper(JdbcMapper jdbcMapper) {
+this.jdbcMapper = jdbcMapper;
+return this;
+}
+
+@Override
+public void execute(Tuple tuple) {
+try {
+ListColumn columns = jdbcMapper.getColumns(tuple);
+ListListColumn columnLists = new ArrayListListColumn();
+columnLists.add(columns);
+this.jdbcClient.insert(this.tableName, columnLists);
+} catch (Exception e) {
+LOG.warn(Failing tuple., e);
+this.collector.fail(tuple);
--- End diff --

I am not seeing this being used in any other connector instead all the 
other connectors, hdfs/hbase/kafka just log the error. I am fine with adding it 
as it just seems to report the error so the ui can display it but disabling the 
log may result in loss of information which can make debugging harder. I 
propose to report the error but also log it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-01-14 Thread Parth-Brahmbhatt
Github user Parth-Brahmbhatt commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r22973996
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java 
---
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.common;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.sql.Date;
+import java.util.*;
+
+public class JDBCClient {
+private static final Logger LOG = 
LoggerFactory.getLogger(JDBCClient.class);
+
+private HikariDataSource dataSource;
+
+public JDBCClient(MapString, Object map) {
+Properties properties = new Properties();
+properties.putAll(map);
+HikariConfig config = new HikariConfig(properties);
+this.dataSource = new HikariDataSource(config);
--- End diff --

yes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-01-14 Thread Parth-Brahmbhatt
Github user Parth-Brahmbhatt commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r22978604
  
--- Diff: external/storm-jdbc/README.md ---
@@ -0,0 +1,208 @@
+#Storm JDBC
+Storm/Trident integration for JDBC. This package includes the core bolts 
and trident states that allows a storm topology
+to either insert storm tuples in a database table or to execute select 
queries against a database and enrich tuples 
+in a storm topology. This code uses HikariCP for connection pooling. See 
http://brettwooldridge.github.io/HikariCP.
+
+## Inserting into a database.
+The bolt and trindet state included in this package for inserting data 
into a database tables are tied to a single table.
+The main API for inserting data in a table using JDBC is the 
`org.apache.storm.jdbc.mapper.JdbcMapper` interface:
+
+```java
+public interface JdbcMapper  extends Serializable {
+ListColumn getColumns(ITuple tuple);
+}
+```
+
+The `getColumns()` method defines how a storm tuple maps to a list of 
columns representing a row in a database.
+
+### SimpleJdbcMapper
+`storm-jdbc` includes a general purpose `JdbcMapper` implementation called 
`SimpleJdbcMapper` that can map Storm
+tuple to a Database row. `SimpleJdbcMapper` assumes that the storm tuple 
has fields with same name as the column name in 
+the database table that you intend to write to.
+
+To use `SimpleJdbcMapper`, you simply tell it the tableName that you want 
to write to and provide a hikari configuration map.
+
+The following code creates a `SimpleJdbcMapper` instance that:
+
+1. Will allow the mapper to transform a storm tuple to a list of columns 
mapping to a row in table test.user_details.
+2. Will use the provided HikariCP configuration to establish a connection 
pool with specified Database configuration and
+automatically figure out the column names and corresponding data types of 
the table that you intend to write to. 
+Please see 
https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby to learn 
more about hikari configuration properties.
+
+```java
+Map hikariConfigMap = Maps.newHashMap();

+hikariConfigMap.put(dataSourceClassName,com.mysql.jdbc.jdbc2.optional.MysqlDataSource);
+hikariConfigMap.put(dataSource.url, jdbc:mysql://localhost/test);
+hikariConfigMap.put(dataSource.user,root);
+hikariConfigMap.put(dataSource.password,password);
+String tableName = user_details;
+JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, map);
+```
+The mapper initialized in the example above assumes a storm tuple has 
value for all the columns. 
+If your storm tuple only has fields for a subset of columns i.e. if some 
of the columns in your table have default values 
+and you want to only insert values for columns with no default values you 
can enforce the behavior by initializing the 
+`SimpleJdbcMapper` with explicit columnschema. For example, if you have a 
user_details table 
+`create table if not exists user_details (user_id integer, user_name 
varchar(100), dept_name varchar(100), create_time TIMESTAMP DEFAULT 
CURRENT_TIMESTAMP);`
+In this table the create_time column has a default value. To ensure only 
the columns with no default values are inserted 
+you can initialize the `jdbcMapper` as below:
+
+```java
+ListColumn columnSchema = Lists.newArrayList(
+new Column(user_id, java.sql.Types.INTEGER),
+new Column(user_name, java.sql.Types.VARCHAR));
+JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
+```
+
+### JdbcBolt
+To use the `JdbcBolt`, construct it with configuration key in your storm 
config that hold the hikari configuration map.
+In addition you must specify the JdbcMapper implementation to covert storm 
tuple to DB row and the table name in which 
+the rows will be inserted.
+
+ ```java
+Config config = new Config();
+config.put(jdbc.conf, hikariConfigMap);
+JdbcBolt userPersistanceBolt = new JdbcBolt(jdbc.conf)
+.withTableName(user_details)
+.withJdbcMapper(simpleJdbcMapper);
+ ```
+### JdbcTridentState
+We also support a trident persistent state that can be used with trident 
topologies. To create a jdbc persistent trident
+state you need to initialize it with the table name, the JdbcMapper 
instance and name of storm config key that holds the
+hikari configuration map. See the example below:
+
+```java
+JdbcState.Options options = new JdbcState.Options()
+.withConfigKey(jdbc.conf)
+.withMapper(jdbcMapper)
+.withTableName(user_details);
+
+JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
+```
+
+## Lookup from 

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-01-14 Thread Parth-Brahmbhatt
Github user Parth-Brahmbhatt commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r22978468
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java 
---
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.common;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.sql.Date;
+import java.util.*;
+
+public class JDBCClient {
+private static final Logger LOG = 
LoggerFactory.getLogger(JDBCClient.class);
+
+private HikariDataSource dataSource;
+
+public JDBCClient(MapString, Object map) {
+Properties properties = new Properties();
+properties.putAll(map);
+HikariConfig config = new HikariConfig(properties);
+this.dataSource = new HikariDataSource(config);
+}
+
+public int insert(String tableName, ListListColumn columnLists) {
+Connection connection = null;
+try {
+connection = this.dataSource.getConnection();
+StringBuilder sb = new StringBuilder();
+sb.append(Insert into ).append(tableName).append( ();
+CollectionString columnNames = 
Collections2.transform(columnLists.get(0), new FunctionColumn, String() {
+@Override
+public String apply(Column input) {
+return input.getColumnName();
+}
+});
+String columns = Joiner.on(,).join(columnNames);
+sb.append(columns).append() values ( );
+
+String placeHolders = 
StringUtils.chop(StringUtils.repeat(?,, columnNames.size()));
+sb.append(placeHolders).append());
+
+String query = sb.toString();
+if(LOG.isDebugEnabled()) {
+LOG.debug(Executing query  + query);
+}
+
+PreparedStatement preparedStatement = 
connection.prepareStatement(query);
+for(ListColumn columnList : columnLists) {
+setPreparedStatementParams(preparedStatement, columnList);
+}
+
+return preparedStatement.executeUpdate();
--- End diff --

added a new config queryTimeOutSecs which is defaulted to 30 seconds which 
is equal to topology.message.timeout.secs. user can override during bolt 
construction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-01-14 Thread itaifrenkel
Github user itaifrenkel commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r22973617
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
 ---
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.bolt;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Basic bolt for querying from any database.
+ */
+public class JdbcLookupBolt extends AbstractJdbcBolt {
+private static final Logger LOG = 
LoggerFactory.getLogger(JdbcLookupBolt.class);
+
+private String selectQuery;
+
+private JdbcLookupMapper jdbcLookupMapper;
+
+public JdbcLookupBolt(String configKey) {
+super(configKey);
+}
+
+public JdbcLookupBolt withJdbcLookupMapper(JdbcLookupMapper 
jdbcLookupMapper) {
+this.jdbcLookupMapper = jdbcLookupMapper;
+return this;
+}
+
+public JdbcLookupBolt withSelectSql(String selectQuery) {
+this.selectQuery = selectQuery;
+return this;
+}
+
+
+@Override
+public void execute(Tuple tuple) {
+try {
+ListColumn columns = jdbcLookupMapper.getColumns(tuple);
+ListListColumn result = 
jdbcClient.select(this.selectQuery, columns);
+
+if (result != null  result.size() != 0) {
+for (ListColumn row : result) {
+ListValues values = jdbcLookupMapper.toTuple(tuple, 
row);
+for (Values value : values) {
+collector.emit(value);
+}
+}
+}
+this.collector.ack(tuple);
+} catch (Exception e) {
+LOG.info(Failed to execute a select query {} on tuple {} , 
this.selectQuery, tuple);
+this.collector.fail(tuple);
--- End diff --

Another approach is to derive from IBasicBolt and let the default error 
handling kick in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-01-14 Thread itaifrenkel
Github user itaifrenkel commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r22973557
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
 ---
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.bolt;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Basic bolt for querying from any database.
+ */
+public class JdbcLookupBolt extends AbstractJdbcBolt {
+private static final Logger LOG = 
LoggerFactory.getLogger(JdbcLookupBolt.class);
+
+private String selectQuery;
+
+private JdbcLookupMapper jdbcLookupMapper;
+
+public JdbcLookupBolt(String configKey) {
+super(configKey);
+}
+
+public JdbcLookupBolt withJdbcLookupMapper(JdbcLookupMapper 
jdbcLookupMapper) {
+this.jdbcLookupMapper = jdbcLookupMapper;
+return this;
+}
+
+public JdbcLookupBolt withSelectSql(String selectQuery) {
+this.selectQuery = selectQuery;
+return this;
+}
+
+
+@Override
+public void execute(Tuple tuple) {
+try {
+ListColumn columns = jdbcLookupMapper.getColumns(tuple);
+ListListColumn result = 
jdbcClient.select(this.selectQuery, columns);
+
+if (result != null  result.size() != 0) {
+for (ListColumn row : result) {
+ListValues values = jdbcLookupMapper.toTuple(tuple, 
row);
+for (Values value : values) {
+collector.emit(value);
+}
+}
+}
+this.collector.ack(tuple);
+} catch (Exception e) {
+LOG.info(Failed to execute a select query {} on tuple {} , 
this.selectQuery, tuple);
--- End diff --

We usually do not log each exception. we let the reportError decide what to 
do with it.  So to use this module we would have to turn off this logging 
completely.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-01-14 Thread itaifrenkel
Github user itaifrenkel commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r22973534
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.bolt;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Basic bolt for writing to any Database table.
+ * p/
+ * Note: Each JdbcBolt defined in a topology is tied to a specific table.
+ */
+public class JdbcBolt extends AbstractJdbcBolt {
+private static final Logger LOG = 
LoggerFactory.getLogger(JdbcBolt.class);
+
+private String tableName;
+private JdbcMapper jdbcMapper;
+
+public JdbcBolt(String configKey) {
+super(configKey);
+}
+
+public JdbcBolt withTableName(String tableName) {
+this.tableName = tableName;
+return this;
+}
+
+public JdbcBolt withJdbcMapper(JdbcMapper jdbcMapper) {
+this.jdbcMapper = jdbcMapper;
+return this;
+}
+
+@Override
+public void execute(Tuple tuple) {
+try {
+ListColumn columns = jdbcMapper.getColumns(tuple);
+ListListColumn columnLists = new ArrayListListColumn();
+columnLists.add(columns);
+this.jdbcClient.insert(this.tableName, columnLists);
+} catch (Exception e) {
+LOG.warn(Failing tuple., e);
--- End diff --

We usually do not log each exception. we let the reportError decide what to 
do with it.  So to use this module we would have to turn off this logging 
completely.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-01-14 Thread harshach
Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r22974645
  
--- Diff: external/storm-jdbc/README.md ---
@@ -0,0 +1,208 @@
+#Storm JDBC
+Storm/Trident integration for JDBC. This package includes the core bolts 
and trident states that allows a storm topology
+to either insert storm tuples in a database table or to execute select 
queries against a database and enrich tuples 
+in a storm topology. This code uses HikariCP for connection pooling. See 
http://brettwooldridge.github.io/HikariCP.
+
+## Inserting into a database.
+The bolt and trindet state included in this package for inserting data 
into a database tables are tied to a single table.
+The main API for inserting data in a table using JDBC is the 
`org.apache.storm.jdbc.mapper.JdbcMapper` interface:
+
+```java
+public interface JdbcMapper  extends Serializable {
+ListColumn getColumns(ITuple tuple);
+}
+```
+
+The `getColumns()` method defines how a storm tuple maps to a list of 
columns representing a row in a database.
+
+### SimpleJdbcMapper
+`storm-jdbc` includes a general purpose `JdbcMapper` implementation called 
`SimpleJdbcMapper` that can map Storm
+tuple to a Database row. `SimpleJdbcMapper` assumes that the storm tuple 
has fields with same name as the column name in 
+the database table that you intend to write to.
+
+To use `SimpleJdbcMapper`, you simply tell it the tableName that you want 
to write to and provide a hikari configuration map.
+
+The following code creates a `SimpleJdbcMapper` instance that:
+
+1. Will allow the mapper to transform a storm tuple to a list of columns 
mapping to a row in table test.user_details.
+2. Will use the provided HikariCP configuration to establish a connection 
pool with specified Database configuration and
+automatically figure out the column names and corresponding data types of 
the table that you intend to write to. 
+Please see 
https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby to learn 
more about hikari configuration properties.
+
+```java
+Map hikariConfigMap = Maps.newHashMap();

+hikariConfigMap.put(dataSourceClassName,com.mysql.jdbc.jdbc2.optional.MysqlDataSource);
+hikariConfigMap.put(dataSource.url, jdbc:mysql://localhost/test);
+hikariConfigMap.put(dataSource.user,root);
+hikariConfigMap.put(dataSource.password,password);
+String tableName = user_details;
+JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, map);
+```
+The mapper initialized in the example above assumes a storm tuple has 
value for all the columns. 
+If your storm tuple only has fields for a subset of columns i.e. if some 
of the columns in your table have default values 
+and you want to only insert values for columns with no default values you 
can enforce the behavior by initializing the 
+`SimpleJdbcMapper` with explicit columnschema. For example, if you have a 
user_details table 
+`create table if not exists user_details (user_id integer, user_name 
varchar(100), dept_name varchar(100), create_time TIMESTAMP DEFAULT 
CURRENT_TIMESTAMP);`
+In this table the create_time column has a default value. To ensure only 
the columns with no default values are inserted 
+you can initialize the `jdbcMapper` as below:
+
+```java
+ListColumn columnSchema = Lists.newArrayList(
+new Column(user_id, java.sql.Types.INTEGER),
+new Column(user_name, java.sql.Types.VARCHAR));
+JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
+```
+
+### JdbcBolt
+To use the `JdbcBolt`, construct it with configuration key in your storm 
config that hold the hikari configuration map.
+In addition you must specify the JdbcMapper implementation to covert storm 
tuple to DB row and the table name in which 
+the rows will be inserted.
+
+ ```java
+Config config = new Config();
+config.put(jdbc.conf, hikariConfigMap);
+JdbcBolt userPersistanceBolt = new JdbcBolt(jdbc.conf)
+.withTableName(user_details)
+.withJdbcMapper(simpleJdbcMapper);
+ ```
+### JdbcTridentState
+We also support a trident persistent state that can be used with trident 
topologies. To create a jdbc persistent trident
+state you need to initialize it with the table name, the JdbcMapper 
instance and name of storm config key that holds the
+hikari configuration map. See the example below:
+
+```java
+JdbcState.Options options = new JdbcState.Options()
+.withConfigKey(jdbc.conf)
+.withMapper(jdbcMapper)
+.withTableName(user_details);
+
+JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
+```
+
+## Lookup from 

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-01-14 Thread itaifrenkel
Github user itaifrenkel commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r22973307
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.bolt;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Basic bolt for writing to any Database table.
+ * p/
+ * Note: Each JdbcBolt defined in a topology is tied to a specific table.
+ */
+public class JdbcBolt extends AbstractJdbcBolt {
+private static final Logger LOG = 
LoggerFactory.getLogger(JdbcBolt.class);
+
+private String tableName;
+private JdbcMapper jdbcMapper;
+
+public JdbcBolt(String configKey) {
+super(configKey);
+}
+
+public JdbcBolt withTableName(String tableName) {
+this.tableName = tableName;
+return this;
+}
+
+public JdbcBolt withJdbcMapper(JdbcMapper jdbcMapper) {
+this.jdbcMapper = jdbcMapper;
+return this;
+}
+
+@Override
+public void execute(Tuple tuple) {
+try {
+ListColumn columns = jdbcMapper.getColumns(tuple);
+ListListColumn columnLists = new ArrayListListColumn();
+columnLists.add(columns);
+this.jdbcClient.insert(this.tableName, columnLists);
+} catch (Exception e) {
+LOG.warn(Failing tuple., e);
+this.collector.fail(tuple);
--- End diff --

this.collector.reportError(e) ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-01-14 Thread itaifrenkel
Github user itaifrenkel commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r22972968
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java 
---
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.common;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.sql.Date;
+import java.util.*;
+
+public class JDBCClient {
+private static final Logger LOG = 
LoggerFactory.getLogger(JDBCClient.class);
+
+private HikariDataSource dataSource;
+
+public JDBCClient(MapString, Object map) {
+Properties properties = new Properties();
+properties.putAll(map);
+HikariConfig config = new HikariConfig(properties);
+this.dataSource = new HikariDataSource(config);
--- End diff --

Each bolt has its own JDBCClient. Each JDBCClient has its own data source. 
Does it mean that each bolt has its own dedicated connection pool ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-01-14 Thread itaifrenkel
Github user itaifrenkel commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r22975021
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java 
---
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.common;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.sql.Date;
+import java.util.*;
+
+public class JDBCClient {
+private static final Logger LOG = 
LoggerFactory.getLogger(JDBCClient.class);
+
+private HikariDataSource dataSource;
+
+public JDBCClient(MapString, Object map) {
+Properties properties = new Properties();
+properties.putAll(map);
+HikariConfig config = new HikariConfig(properties);
+this.dataSource = new HikariDataSource(config);
+}
+
+public int insert(String tableName, ListListColumn columnLists) {
+Connection connection = null;
+try {
+connection = this.dataSource.getConnection();
+StringBuilder sb = new StringBuilder();
+sb.append(Insert into ).append(tableName).append( ();
+CollectionString columnNames = 
Collections2.transform(columnLists.get(0), new FunctionColumn, String() {
+@Override
+public String apply(Column input) {
+return input.getColumnName();
+}
+});
+String columns = Joiner.on(,).join(columnNames);
+sb.append(columns).append() values ( );
+
+String placeHolders = 
StringUtils.chop(StringUtils.repeat(?,, columnNames.size()));
+sb.append(placeHolders).append());
+
+String query = sb.toString();
+if(LOG.isDebugEnabled()) {
+LOG.debug(Executing query  + query);
+}
+
+PreparedStatement preparedStatement = 
connection.prepareStatement(query);
+for(ListColumn columnList : columnLists) {
+setPreparedStatementParams(preparedStatement, columnList);
+}
+
+return preparedStatement.executeUpdate();
--- End diff --

executeUpdate could block for a very long time since 
preparedState.setQueryTimeout() was not called. This should be preferably 
configurable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-01-14 Thread Parth-Brahmbhatt
Github user Parth-Brahmbhatt commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r22974973
  
--- Diff: 
external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
 ---
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.bolt;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Basic bolt for querying from any database.
+ */
+public class JdbcLookupBolt extends AbstractJdbcBolt {
+private static final Logger LOG = 
LoggerFactory.getLogger(JdbcLookupBolt.class);
+
+private String selectQuery;
+
+private JdbcLookupMapper jdbcLookupMapper;
+
+public JdbcLookupBolt(String configKey) {
+super(configKey);
+}
+
+public JdbcLookupBolt withJdbcLookupMapper(JdbcLookupMapper 
jdbcLookupMapper) {
+this.jdbcLookupMapper = jdbcLookupMapper;
+return this;
+}
+
+public JdbcLookupBolt withSelectSql(String selectQuery) {
+this.selectQuery = selectQuery;
+return this;
+}
+
+
+@Override
+public void execute(Tuple tuple) {
+try {
+ListColumn columns = jdbcLookupMapper.getColumns(tuple);
+ListListColumn result = 
jdbcClient.select(this.selectQuery, columns);
+
+if (result != null  result.size() != 0) {
+for (ListColumn row : result) {
+ListValues values = jdbcLookupMapper.toTuple(tuple, 
row);
+for (Values value : values) {
+collector.emit(value);
+}
+}
+}
+this.collector.ack(tuple);
+} catch (Exception e) {
+LOG.info(Failed to execute a select query {} on tuple {} , 
this.selectQuery, tuple);
+this.collector.fail(tuple);
--- End diff --

same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-01-14 Thread harshach
Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/374#discussion_r22950826
  
--- Diff: external/storm-jdbc/README.md ---
@@ -0,0 +1,208 @@
+#Storm JDBC
+Storm/Trident integration for JDBC. This package includes the core bolts 
and trident states that allows a storm topology
+to either insert storm tuples in a database table or to execute select 
queries against a database and enrich tuples 
+in a storm topology. This code uses HikariCP for connection pooling. See 
http://brettwooldridge.github.io/HikariCP.
+
+## Inserting into a database.
+The bolt and trindet state included in this package for inserting data 
into a database tables are tied to a single table.
--- End diff --

typo 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm 616 : Storm-jdbc connector.

2015-01-06 Thread harshach
Github user harshach commented on the pull request:

https://github.com/apache/storm/pull/372#issuecomment-68871971
  
@Parth-Brahmbhatt  looks like you might have unintended commit as part of 
this PR.
STORM-586: TridentKafkaEmitter should catch updateOffsetException.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

2015-01-06 Thread Parth-Brahmbhatt
GitHub user Parth-Brahmbhatt opened a pull request:

https://github.com/apache/storm/pull/374

Storm-616 : Storm-jdbc connector.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Parth-Brahmbhatt/incubator-storm STORM-616

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/374.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #374


commit 5b160168c75c0e8c4c402a5e24f606dab697fbef
Author: Parth Brahmbhatt brahmbhatt.pa...@gmail.com
Date:   2015-01-06T03:14:18Z

STORM-616: Jdbc connector for storm.

commit ab9f778ae50a1e224ebdcc58e6249009fc1f91cc
Author: Parth Brahmbhatt brahmbhatt.pa...@gmail.com
Date:   2015-01-06T03:23:52Z

Merge remote-tracking branch 'upstream/master' into STORM-616

commit d260759ac203383e27668a7cb7090926029f7406
Author: Parth Brahmbhatt brahmbhatt.pa...@gmail.com
Date:   2015-01-06T03:31:05Z

STORM-616 : removing unintended changes.

commit 079deda496d16bd896611da706402c9df7e2319f
Author: Parth Brahmbhatt brahmbhatt.pa...@gmail.com
Date:   2015-01-06T17:47:40Z

STORM-616:Adding storm-jdbc as external module in pom. Adding links to 
hikariCP configuration in README.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm 616 : Storm-jdbc connector.

2015-01-06 Thread harshach
Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/372#discussion_r22536766
  
--- Diff: external/storm-jdbc/README.md ---
@@ -0,0 +1,117 @@
+#Storm HBase
+
+Storm/Trident integration for JDBC.
+
+## Usage
+The main API for interacting with JDBC is the 
`org.apache.storm.jdbc.mapper.TupleToColumnMapper`
+interface:
+
+```java
+public interface JdbcMapper  extends Serializable {
+ListColumn getColumns(ITuple tuple);
+}
+```
+
+The `getColumns()` method defines how a storm tuple maps to a list of 
columns representing a row in a database.
+
+### SimpleJdbcMapper
+`storm-jdbc` includes a general purpose `JdbcMapper` implementation called 
`SimpleJdbcMapper` that can map Storm
+tuple to a Database row. `SimpleJdbcMapper` assumes that the tuple has 
fields with same name as the column name in 
+the database table that you intend to write to.
+
+To use `SimpleJdbcMapper`, you simply tell it the tableName that you want 
to write to and provide a hikari configuration map.
+
+The following code creates a `SimpleJdbcMapper` instance that:
+
+1. Will allow the mapper to transform a storm tuple to a list of columns 
mapping to a row in table test.user_details.
+2. Will use the provided HikariCP configuration to establish a connection 
pool with specified Database configuration and
+automatically figure out the column names of the table that you intend to 
write to.
+
+```java
+Map hikariConfigMap = Maps.newHashMap();

+hikariConfigMap.put(dataSourceClassName,com.mysql.jdbc.jdbc2.optional.MysqlDataSource);
+hikariConfigMap.put(dataSource.url, jdbc:mysql://localhost/test);
+hikariConfigMap.put(dataSource.user,root);
+hikariConfigMap.put(dataSource.password,password);
+String tableName = user_details;
+JdbcMapper jdbcMapper = new SimpleJdbcMapper(tableName, map);
+```
+### JdbcBolt
+To use the `JdbcBolt`, construct it with the name of the table to write 
to, and a `JdbcMapper` implementation. In addition
+you must specify a configuration key that hold the hikari configuration 
map.
+
+ ```java
+Config config = new Config();
+config.put(jdbc.conf, hikariConfigMap);
+
+JdbcBolt bolt = new JdbcBolt(user_details, jdbcMapper)
+.withConfigKey(jdbc.conf);
--- End diff --

is jdbc.conf is a properties file? if so withConfigKey can be renamed to 
withConfigFile. It will be great if you can add a sample jdbc.conf file to the 
README.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm 616 : Storm-jdbc connector.

2015-01-06 Thread Parth-Brahmbhatt
Github user Parth-Brahmbhatt commented on the pull request:

https://github.com/apache/storm/pull/372#issuecomment-68904233
  
* I noticed that and reverted those files to their state in master.
* Added storm-jdbc to storm-core.
* sorry about this, I accidently deleted it with mysql dependency that i 
added for testing. I have added it back.
* jdbc.conf is just a key in storm config that holds a map for hikariCP 
configuration. I have added a link to HikariCP configuration document.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm 616 : Storm-jdbc connector.

2015-01-06 Thread harshach
Github user harshach commented on the pull request:

https://github.com/apache/storm/pull/372#issuecomment-68899213
  
@Parth-Brahmbhatt few things to note
1) add external/storm-jdbc to storm/pom.xml under modules
2) I didn't' find any dependency in storm-jdbc/pom.xml for 
com.google.common and its causing compilation errors  package 
com.google.common.base does not exist


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---