Hi,

I'm trying to write to Postgres database from Python using portable runner.

The API [1] has a "statement" parameter to let us specify a SQL statement:

driver_class_name – name of the jdbc driver class
jdbc_url – full jdbc url to the database.
username – database username
password – database password
statement – sql statement to be executed
^^

The write works as expected if I don't specify a "statement" parameter.

If I do specify a statement, it's ignored by Beam:

I have specified the following statement, equivalent to an UPSERT in Postgres.

INSERT INTO jdbc_external_test_write_v2a (id, name, id_name)
VALUES (?, ?)
ON CONFLICT ON CONSTRAINT jdbc_external_test_write_v2a_pkey
DO UPDATE SET id = EXCLUDED.id, name = EXCLUDED.name, id_name = EXCLUDED.id_name

However, my Postgres logs indicate that Beam is sending the default
statement, not the statement I have specified:

2022-03-30 21:20:09.055 PDT [14504] LOG:  execute S_2: INSERT INTO
jdbc_external_test_write_v2a VALUES($1, $2, $3)
2022-03-30 21:20:09.055 PDT [14504] DETAIL:  parameters: $1 = '1', $2
= 'hello', $3 = '1_hello'
2022-03-30 21:20:09.071 PDT [14503] LOG:  execute S_2: ROLLBACK
2022-03-30 21:20:09.072 PDT [14504] ERROR:  duplicate key value
violates unique constraint "jdbc_external_test_write_v2a_pkey"

How can I get Beam to recognize my (override) SQL statement?

Thanks,
Deepak

[1] 
https://beam.apache.org/releases/pydoc/2.35.0/apache_beam.io.jdbc.html?highlight=jdbc#module-apache_beam.io.jdbc

Reply via email to