Hi, sorry for the late reply. I'm not an postgresSql expert, but I try my best. Also, I'm not aware of many users of the JDBC output format, so I guess its quite likely that there are open issues with it.
The exceptions have been thrown by the PostgreSQL JDBC driver. As far as I can see, there are two exceptions, one in the "writeRecord()" method call and one in the close() call. I think we can ignore the exception in the close() call because its somewhat expected that we can not execute further queries when parts of the batch have failed. I suspect that the batched inserts performed by the OutputFormat are not compatible with upsert function. (Have a look at the code to see how we do batched inserts: https://github.com/apache/flink/blob/b904b0041cf97b2c6181b1985afc457ed01cf626/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ) I would recommend you to prototype a new OutputFormat which is performing regular queries to see if this is working with upserts. Robert On Wed, Jan 14, 2015 at 1:16 PM, Benoît Hanotte < [email protected]> wrote: > Hi, > > I'm trying to use a spatial PostgreSQL with PostGIS database as a Sink for > my data. > It works perfectly when performing an INSERT with my data, but when trying > to perform an UPSERT it fails with the following errors: > > 15/01/14 12:52:08 ERROR operators.DataSinkTask: Error in user code: > writeRecord() failed: DataSink(org.apache.flink.api. > java.io.jdbc.JDBCOutputFormat@77a8fb80) (4/4) > java.lang.IllegalArgumentException: writeRecord() failed > at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat. > writeRecord(JDBCOutputFormat.java:132) > at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat. > writeRecord(JDBCOutputFormat.java:41) > at org.apache.flink.runtime.operators.DataSinkTask.invoke( > DataSinkTask.java:173) > at org.apache.flink.runtime.execution.RuntimeEnvironment. > run(RuntimeEnvironment.java:235) > at java.lang.Thread.run(Thread.java:744) > Caused by: java.sql.BatchUpdateException: L'élément du batch 0 SELECT > upsert_nsp_db(-42.48112678527832,-73.76546859741211,8.0,0.0) a été > annulé. Appeler getNextException pour en connaître la cause. > at org.postgresql.jdbc2.AbstractJdbc2Statement$BatchResultHandler. > handleError(AbstractJdbc2Statement.java:2743) > at org.postgresql.jdbc2.AbstractJdbc2Statement$BatchResultHandler. > handleResultRows(AbstractJdbc2Statement.java:2692) > at org.postgresql.core.v3.QueryExecutorImpl$ > ErrorTrackingResultHandler.handleResultRows(QueryExecutorImpl.java:333) > at org.postgresql.core.v3.QueryExecutorImpl.processResults( > QueryExecutorImpl.java:1853) > at org.postgresql.core.v3.QueryExecutorImpl.sendQuery( > QueryExecutorImpl.java:1130) > at org.postgresql.core.v3.QueryExecutorImpl.execute( > QueryExecutorImpl.java:396) > at org.postgresql.jdbc2.AbstractJdbc2Statement.executeBatch( > AbstractJdbc2Statement.java:2892) > at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat. > writeRecord(JDBCOutputFormat.java:127) > ... 4 more > 15/01/14 12:52:08 WARN operators.DataSinkTask: Error closing the ouput > format.: > DataSink(org.apache.flink.api.java.io.jdbc.JDBCOutputFormat@77a8fb80) > (4/4) > java.lang.IllegalArgumentException: close() failed > at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.close( > JDBCOutputFormat.java:188) > at org.apache.flink.runtime.operators.DataSinkTask.invoke( > DataSinkTask.java:202) > at org.apache.flink.runtime.execution.RuntimeEnvironment. > run(RuntimeEnvironment.java:235) > at java.lang.Thread.run(Thread.java:744) > Caused by: org.postgresql.util.PSQLException: Ce statement a été fermé. > (Translates into This statement has been closed) > at org.postgresql.jdbc2.AbstractJdbc2Statement.checkClosed( > AbstractJdbc2Statement.java:2634) > at org.postgresql.jdbc2.AbstractJdbc2Statement.executeBatch( > AbstractJdbc2Statement.java:2832) > at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.close( > JDBCOutputFormat.java:185) > ... 3 more > > > > My java code is > > > var.output( > // build and configure OutputFormat > JDBCOutputFormat > .buildJDBCOutputFormat() > .setDrivername("org.postgresql.Driver") > .setDBUrl("jdbc:postgresql://127.0.0.1:5432/test") > .setUsername("postgres") > .setPassword("") > .setQuery("SELECT upsert_nsp_db(?,?,?);") // lat, lng, val > .finish() > ); > > > > My upsert function is the following: > > > CREATE FUNCTION upsert_db(lat DOUBLE PRECISION, lng DOUBLE PRECISION, m > FLOAT) RETURNS VOID AS > $$ > BEGIN > LOOP > -- first try to update the key > UPDATE nsp_db SET mean = m WHERE ST_Equals(location, > ST_SetSRID(ST_MakePoint(lng,lat),26918)); > IF found THEN > RETURN; > END IF; > -- not there, so try to insert the key > -- if someone else inserts the same key concurrently, > -- we could get a unique-key failure > BEGIN > INSERT INTO nsp_db (location, mean) VALUES > (ST_SetSRID(ST_MakePoint(lng,lat),26918), m); > RETURN; > EXCEPTION WHEN unique_violation THEN > -- do nothing, and loop to try the UPDATE again > END; > END LOOP; > END; > $$ > LANGUAGE plpgsql; > > > I suspect it is some kind of timeout, but is there any way I could solve > this issue? > > Thanks! >
