[FLINK-3292]Fix for Bug in flink-jdbc. Not all JDBC drivers supported

This closes #1551


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d97fcda6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d97fcda6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d97fcda6

Branch: refs/heads/master
Commit: d97fcda6635b6821ac3f61c39e0fa156bc7c7fd4
Parents: 83b88c2
Author: Subhobrata Dey <sbc...@gmail.com>
Authored: Wed Jan 27 17:00:37 2016 -0500
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jan 28 14:37:53 2016 +0100

----------------------------------------------------------------------
 .../flink/api/java/io/jdbc/JDBCInputFormat.java     | 16 +++++++++++++++-
 .../flink/api/java/io/jdbc/JDBCInputFormatTest.java |  3 +++
 .../api/java/io/jdbc/JDBCOutputFormatTest.java      |  3 +++
 3 files changed, 21 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d97fcda6/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
 
b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
index eb3ac31..84eb309 100644
--- 
a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
+++ 
b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
@@ -58,6 +58,8 @@ public class JDBCInputFormat<OUT extends Tuple> extends 
RichInputFormat<OUT, Inp
        private String drivername;
        private String dbURL;
        private String query;
+       private int resultSetType;
+       private int resultSetConcurrency;
 
        private transient Connection dbConn;
        private transient Statement statement;
@@ -82,7 +84,7 @@ public class JDBCInputFormat<OUT extends Tuple> extends 
RichInputFormat<OUT, Inp
        public void open(InputSplit ignored) throws IOException {
                try {
                        establishConnection();
-                       statement = 
dbConn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, 
ResultSet.CONCUR_READ_ONLY);
+                       statement = dbConn.createStatement(resultSetType, 
resultSetConcurrency);
                        resultSet = statement.executeQuery(query);
                } catch (SQLException se) {
                        close();
@@ -308,6 +310,8 @@ public class JDBCInputFormat<OUT extends Tuple> extends 
RichInputFormat<OUT, Inp
 
                public JDBCInputFormatBuilder() {
                        this.format = new JDBCInputFormat();
+                       this.format.resultSetType = ResultSet.TYPE_FORWARD_ONLY;
+                       this.format.resultSetConcurrency = 
ResultSet.CONCUR_READ_ONLY;
                }
 
                public JDBCInputFormatBuilder setUsername(String username) {
@@ -335,6 +339,16 @@ public class JDBCInputFormat<OUT extends Tuple> extends 
RichInputFormat<OUT, Inp
                        return this;
                }
 
+               public JDBCInputFormatBuilder setResultSetType(int 
resultSetType) {
+                       format.resultSetType = resultSetType;
+                       return this;
+               }
+
+               public JDBCInputFormatBuilder setResultSetConcurrency(int 
resultSetConcurrency) {
+                       format.resultSetConcurrency = resultSetConcurrency;
+                       return this;
+               }
+
                public JDBCInputFormat finish() {
                        if (format.username == null) {
                                LOG.info("Username was not supplied 
separately.");

http://git-wip-us.apache.org/repos/asf/flink/blob/d97fcda6/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
 
b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
index b76f8b8..b1d43df 100644
--- 
a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
+++ 
b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
@@ -23,6 +23,8 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.sql.ResultSet;
+
 
 import org.junit.Assert;
 
@@ -172,6 +174,7 @@ public class JDBCInputFormatTest {
                                
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
                                .setDBUrl("jdbc:derby:memory:ebookshop")
                                .setQuery("select * from books")
+                               
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
                                .finish();
                jdbcInputFormat.open(null);
                Tuple5 tuple = new Tuple5();

http://git-wip-us.apache.org/repos/asf/flink/blob/d97fcda6/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
 
b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
index 7d004f9..276518b 100644
--- 
a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
+++ 
b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
@@ -23,6 +23,7 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.sql.ResultSet;
 
 import org.junit.Assert;
 
@@ -199,6 +200,7 @@ public class JDBCOutputFormatTest {
                                .setDrivername(driverPath)
                                .setDBUrl(dbUrl)
                                .setQuery("select * from " + sourceTable)
+                               
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
                                .finish();
                jdbcInputFormat.open(null);
 
@@ -215,6 +217,7 @@ public class JDBCOutputFormatTest {
                                .setDrivername(driverPath)
                                .setDBUrl(dbUrl)
                                .setQuery("select * from " + targetTable)
+                               
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
                                .finish();
                jdbcInputFormat.open(null);
 

Reply via email to