Hi,
postgres字段包含大小写。
postgres_sink = """
CREATE TABLE alarm_history_data (
`recordId` STRING,
`rowtime` TIMESTAMP(3),
`action` STRING,
`originalState` STRING,
`newState` STRING,
`originalCause` STRING,
`newCause` STRING,
`ser_name` STRING,
`enb` STRING,
`eventTime` STRING,
`ceasedTime` STRING,
`duration` STRING,
`acked` STRING,
`pmdId` STRING,
`pmdTime` STRING,
PRIMARY KEY (`recordId`) NOT ENFORCED
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:postgresql://127.0.0.1:5432/postgres',
'connector.table' = 'alarm_history_data',
'connector.driver' = 'org.postgresql.Driver',
'connector.username' = 'postgres',
'connector.password' = 'my_password',
'connector.write.flush.max-rows' = '1'
)
"""
st_env.scan("source").group_by("recordId").select(
"recordId,"
"last_tvalue(actionTime) as rowtime, last_value(action),"
"last_value(originalState) as originalState, last_value(newState),"
"last_value(originalCause), last_value(newCause),"
"last_value(ser_name), last_value(enb), last_value(eventTime),"
"last_value(ceasedTime), last_value(duration), last_value(acked),"
"last_value(pmdId), last_value(pmdTime)"
).insert_into("alarm_history_data")
sink出错,报错是:
Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO
alarm_history_data(recordId, rowtime, action, originalState, newState,
originalCause, newCause, ser_name, enb, eventTime, ceasedTime, duration,
acked, pmdId, pmdTime) VALUES ('47357607', '2020-06-03 17:37:44+08',
'Insert', '', 'cleared', '', 'crash', 'Oyama_ENM_MS',
'789198-houshakuzi-RBS6302', '2020-06-03T17:24:57', '2020-06-03T17:29:50',
'293.0', 'No', '0x80000002', '2020-06-03T17:22:46') ON CONFLICT (recordId)
DO UPDATE SET recordId=EXCLUDED.recordId, rowtime=EXCLUDED.rowtime,
action=EXCLUDED.action, originalState=EXCLUDED.originalState,
newState=EXCLUDED.newState, originalCause=EXCLUDED.originalCause,
newCause=EXCLUDED.newCause, ser_name=EXCLUDED.ser_name, enb=EXCLUDED.enb,
eventTime=EXCLUDED.eventTime, ceasedTime=EXCLUDED.ceasedTime,
duration=EXCLUDED.duration, acked=EXCLUDED.acked, pmdId=EXCLUDED.pmdId,
pmdTime=EXCLUDED.pmdTime was aborted: ERROR: column "recordid" of relation
"alarm_history_data" does not exist
请问要怎么解决?要怎样才能在最终的sql语句里面加个引号把字段包起来?
--
Sent from: http://apache-flink.147419.n8.nabble.com/