you are not supposed to call open yourselves.
On 05.06.2016 11:05, David Olsen wrote:
Following the sample on the flink website[1] to test jdbc I
encountered an error "Couldn't access resultSet". It looks like the
nextRecord is called before open() function. However I've called
open() when I construct jdbc input format. Any functions I should call
before job submission?
def jdbc()= {
val jdbcif =
JDBCInputFormat.buildJDBCInputFormat.setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://localhost/test").setQuery("select
name from department").setUsername(...).setPassword(...).finish
jdbcif.open(null)
jdbcif.asInstanceOf[JDBCInputFormat[Tuple1[String]]]
}
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment // ->
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
val evidence$6 = new TupleTypeInfo(classOf[Tuple1[String]],
STRING_TYPE_INFO)
val stream = env.createInput(jdbc(), evidence$6)
stream.map ( new MapFunction[Tuple1[String], String]() {
override def map(tuple: Tuple1[String]): String = tuple.getField(0)
}).returns(classOf[String]).writeAsText("/path/to/jdbc")
env.execute("test-flink")
}
The version used in this test is flink 1.0.3 and scala 2.11.
[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/