I remove the open method when constructing jdbc input format, but I still obtain "couldn't access resultSet" error.
Caused by: java.io.IOException: Couldn't access resultSet at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:179) at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:51) at org.apache.flink.streaming.api.functions.source.FileSourceFunction.run(FileSourceFunction.java:124) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:164) ... 7 more Anything I should check as well? Thanks On 5 June 2016 at 17:26, Chesnay Schepler <ches...@apache.org> wrote: > you are not supposed to call open yourselves. > > > On 05.06.2016 11:05, David Olsen wrote: > >> Following the sample on the flink website[1] to test jdbc I encountered >> an error "Couldn't access resultSet". It looks like the nextRecord is >> called before open() function. However I've called open() when I construct >> jdbc input format. Any functions I should call before job submission? >> >> def jdbc()= { >> val jdbcif = >> JDBCInputFormat.buildJDBCInputFormat.setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://localhost/test").setQuery("select >> name from department").setUsername(...).setPassword(...).finish >> jdbcif.open(null) >> jdbcif.asInstanceOf[JDBCInputFormat[Tuple1[String]]] >> } >> >> def main(args: Array[String]) { >> val env = StreamExecutionEnvironment.getExecutionEnvironment // -> >> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment >> val evidence$6 = new TupleTypeInfo(classOf[Tuple1[String]], >> STRING_TYPE_INFO) >> val stream = env.createInput(jdbc(), evidence$6) >> stream.map ( new MapFunction[Tuple1[String], String]() { >> override def map(tuple: Tuple1[String]): String = tuple.getField(0) >> }).returns(classOf[String]).writeAsText("/path/to/jdbc") >> env.execute("test-flink") >> } >> >> The version used in this test is flink 1.0.3 and scala 2.11. >> >> [1]. >> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/ >> > >