Hi,
I'm trying to use a spatial PostgreSQL with PostGIS database as a Sink for
my data.
It works perfectly when performing an INSERT with my data, but when trying
to perform an UPSERT it fails with the following errors:
15/01/14 12:52:08 ERROR operators.DataSinkTask: Error in user code:
writeRecord() failed:
DataSink(org.apache.flink.api.java.io.jdbc.JDBCOutputFormat@77a8fb80) (4/4)
java.lang.IllegalArgumentException: writeRecord() failed
at
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:132)
at
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:41)
at
org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:173)
at
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.sql.BatchUpdateException: L'élément du batch 0 SELECT
upsert_nsp_db(-42.48112678527832,-73.76546859741211,8.0,0.0) a été annulé.
Appeler getNextException pour en connaître la cause.
at
org.postgresql.jdbc2.AbstractJdbc2Statement$BatchResultHandler.handleError(AbstractJdbc2Statement.java:2743)
at
org.postgresql.jdbc2.AbstractJdbc2Statement$BatchResultHandler.handleResultRows(AbstractJdbc2Statement.java:2692)
at
org.postgresql.core.v3.QueryExecutorImpl$ErrorTrackingResultHandler.handleResultRows(QueryExecutorImpl.java:333)
at
org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1853)
at
org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1130)
at
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:396)
at
org.postgresql.jdbc2.AbstractJdbc2Statement.executeBatch(AbstractJdbc2Statement.java:2892)
at
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:127)
... 4 more
15/01/14 12:52:08 WARN operators.DataSinkTask: Error closing the ouput
format.:
DataSink(org.apache.flink.api.java.io.jdbc.JDBCOutputFormat@77a8fb80) (4/4)
java.lang.IllegalArgumentException: close() failed
at
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.close(JDBCOutputFormat.java:188)
at
org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:202)
at
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
at java.lang.Thread.run(Thread.java:744)
Caused by: org.postgresql.util.PSQLException: Ce statement a été fermé.
(Translates into This statement has been closed)
at
org.postgresql.jdbc2.AbstractJdbc2Statement.checkClosed(AbstractJdbc2Statement.java:2634)
at
org.postgresql.jdbc2.AbstractJdbc2Statement.executeBatch(AbstractJdbc2Statement.java:2832)
at
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.close(JDBCOutputFormat.java:185)
... 3 more
My java code is
var.output(
// build and configure OutputFormat
JDBCOutputFormat
.buildJDBCOutputFormat()
.setDrivername("org.postgresql.Driver")
.setDBUrl("jdbc:postgresql://127.0.0.1:5432/test")
.setUsername("postgres")
.setPassword("")
.setQuery("SELECT upsert_nsp_db(?,?,?);") // lat, lng, val
.finish()
);
My upsert function is the following:
CREATE FUNCTION upsert_db(lat DOUBLE PRECISION, lng DOUBLE PRECISION, m
FLOAT) RETURNS VOID AS
$$
BEGIN
LOOP
-- first try to update the key
UPDATE nsp_db SET mean = m WHERE ST_Equals(location,
ST_SetSRID(ST_MakePoint(lng,lat),26918));
IF found THEN
RETURN;
END IF;
-- not there, so try to insert the key
-- if someone else inserts the same key concurrently,
-- we could get a unique-key failure
BEGIN
INSERT INTO nsp_db (location, mean) VALUES
(ST_SetSRID(ST_MakePoint(lng,lat),26918), m);
RETURN;
EXCEPTION WHEN unique_violation THEN
-- do nothing, and loop to try the UPDATE again
END;
END LOOP;
END;
$$
LANGUAGE plpgsql;
I suspect it is some kind of timeout, but is there any way I could solve
this issue?
Thanks!