open() not being called is a valuable observation, but from then on out
i have problems following you.
within hasNext() we first check whether exhausted is true, and if so
return false. Since it is initialized with false we will not return here.
this seems like correct behaviour.
I have one question though: what parallelism does the source run with?
On 06.06.2016 17:37, David Olsen wrote:
After recompiling the 1.0.3 source and testing it, I discover that
InputFormat.open() in FileSourceFunction doesn't get called
because splitIterator.hasNext() returns false. This looks
like getInputSplits() creates Iterator<InputSplit> object with
'exhausted' variable initialized to false, and then the following
statement checks if hasNext() then open inputformat when it's true but
exhausted variable always returns false, resulting in
InputFormat.open() is not called (due to hasNext() always returns
false). Is the variable 'exhausted' supposed to act in that way
(initialized to false, then check if hasNext() true, which
unfortunately is always false)?
I appreciate any suggestions. Thanks.
On 6 June 2016 at 15:46, Chesnay Schepler <ches...@apache.org
<mailto:ches...@apache.org>> wrote:
the JDBC IF does not and never has used the configuration.
On 06.06.2016 09:27, Aljoscha Krettek wrote:
The problem could be that open() is not called with a proper
Configuration object in streaming mode.
On Sun, 5 Jun 2016 at 19:33 Stephan Ewen <se...@apache.org
<mailto:se...@apache.org>> wrote:
Hi David!
You are using the JDBC format that was written for the batch
API in the streaming API.
While that should actually work, it is a somewhat new and
less tested function. Let's double check that the call to
open() is properly forwarded.
On Sun, Jun 5, 2016 at 12:47 PM, David Olsen
<davidolsen4...@gmail.com <mailto:davidolsen4...@gmail.com>>
wrote:
Switching to use
org.apache.flink.api.java.ExecutionEnvironment, my code
can successfully read data from database through
JDBCInputFormat. But I need stream mode (and now it seems
that the DataSet and DataStream is not interchangeable).
Are there any additional functions required to be
executed before StreamExecutionEnvironment creates jdbc
input?
Thanks
On 5 June 2016 at 18:26, David Olsen
<davidolsen4...@gmail.com
<mailto:davidolsen4...@gmail.com>> wrote:
I remove the open method when constructing jdbc input
format, but I still obtain "couldn't access
resultSet" error.
Caused by: java.io.IOException: Couldn't access resultSet
at
org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:179)
at
org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:51)
at
org.apache.flink.streaming.api.functions.source.FileSourceFunction.run(FileSourceFunction.java:124)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at
org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:164)
... 7 more
Anything I should check as well?
Thanks
On 5 June 2016 at 17:26, Chesnay Schepler
<ches...@apache.org <mailto:ches...@apache.org>> wrote:
you are not supposed to call open yourselves.
On 05.06.2016 11:05, David Olsen wrote:
Following the sample on the flink website[1]
to test jdbc I encountered an error "Couldn't
access resultSet". It looks like the
nextRecord is called before open() function.
However I've called open() when I construct
jdbc input format. Any functions I should
call before job submission?
def jdbc()= {
val jdbcif =
JDBCInputFormat.buildJDBCInputFormat.setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://localhost/test").setQuery("select
name from
department").setUsername(...).setPassword(...).finish
jdbcif.open(null)
jdbcif.asInstanceOf[JDBCInputFormat[Tuple1[String]]]
}
def main(args: Array[String]) {
val env =
StreamExecutionEnvironment.getExecutionEnvironment
// -> import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
val evidence$6 = new
TupleTypeInfo(classOf[Tuple1[String]],
STRING_TYPE_INFO)
val stream = env.createInput(jdbc(),
evidence$6)
stream.map ( new
MapFunction[Tuple1[String], String]() {
override def map(tuple:
Tuple1[String]): String = tuple.getField(0)
}).returns(classOf[String]).writeAsText("/path/to/jdbc")
env.execute("test-flink")
}
The version used in this test is flink 1.0.3
and scala 2.11.
[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/