Switching to use org.apache.flink.api.java.ExecutionEnvironment, my code can successfully read data from database through JDBCInputFormat. But I need stream mode (and now it seems that the DataSet and DataStream is not interchangeable). Are there any additional functions required to be executed before StreamExecutionEnvironment creates jdbc input?
Thanks On 5 June 2016 at 18:26, David Olsen <davidolsen4...@gmail.com> wrote: > I remove the open method when constructing jdbc input format, but I still > obtain "couldn't access resultSet" error. > > Caused by: java.io.IOException: Couldn't access resultSet > at > org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:179) > at > org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:51) > at > org.apache.flink.streaming.api.functions.source.FileSourceFunction.run(FileSourceFunction.java:124) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at > org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:164) > ... 7 more > > Anything I should check as well? > > Thanks > > > On 5 June 2016 at 17:26, Chesnay Schepler <ches...@apache.org> wrote: > >> you are not supposed to call open yourselves. >> >> >> On 05.06.2016 11:05, David Olsen wrote: >> >>> Following the sample on the flink website[1] to test jdbc I encountered >>> an error "Couldn't access resultSet". It looks like the nextRecord is >>> called before open() function. However I've called open() when I construct >>> jdbc input format. Any functions I should call before job submission? >>> >>> def jdbc()= { >>> val jdbcif = >>> JDBCInputFormat.buildJDBCInputFormat.setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://localhost/test").setQuery("select >>> name from department").setUsername(...).setPassword(...).finish >>> jdbcif.open(null) >>> jdbcif.asInstanceOf[JDBCInputFormat[Tuple1[String]]] >>> } >>> >>> def main(args: Array[String]) { >>> val env = StreamExecutionEnvironment.getExecutionEnvironment // -> >>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment >>> val evidence$6 = new TupleTypeInfo(classOf[Tuple1[String]], >>> STRING_TYPE_INFO) >>> val stream = env.createInput(jdbc(), evidence$6) >>> stream.map ( new MapFunction[Tuple1[String], String]() { >>> override def map(tuple: Tuple1[String]): String = tuple.getField(0) >>> }).returns(classOf[String]).writeAsText("/path/to/jdbc") >>> env.execute("test-flink") >>> } >>> >>> The version used in this test is flink 1.0.3 and scala 2.11. >>> >>> [1]. >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/ >>> >> >> >