Thanks Chiwan and Chesnay 
I'm happy :-)
On lun., mar. 7, 2016 at 14:18, Chiwan Park  wrote:
Hi Toletum,

You can initialize a JDBC connection with RichSinkFunction [1]. There are two 
methods, `open` and `close`. The `open` method is called once before calling 
`invoke` method. The `close` method is called lastly.

Note that you should add `transient` keyword to the JDBC connection object.

Regards,
Chiwan Park

[1]: 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.html
 
(https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.html)

On Mar 7, 2016, at 10:08 PM, [email protected] (mailto:[email protected]) 
wrote:

Hi!
I'm doing a process which reads from kafka, makes some things... and after 
writes on Database (NEO4J). I can read from kafka, and make some things.... 
But... I have problems with write on Database (JDBC).
I tried use a SinkFunction.... It works, but it create a connection each invoke 
method is called.

--------
DataStream messageStream = this.env.addSource(new 
FlinkKafkaConsumer082(properties.getProperty("topic"), new 
SimpleStringSchema(), properties));

messageStream.map(new StreamingCrimeSplitter 
.filter(new filterFunction())
.keyBy(1);
.addSink(new sinkFunction());

--------
--------
public class sinkFunction
implements SinkFunction {
private static final long serialVersionUID = 2859601213304525959L;
@Override
public void invoke(Tuple7 crime) throws Exception {
System.out.println(crime.f0);
//JDBC connection
}
}
--------

Somebody knows how I could do just one connection? I tried to do in the 
Constructor but the JDBC is not serializable.

Thanks
Toletum

Reply via email to