Hello, my friends
env: Flink 1.10, Blink Planner table source CREATE TABLE josn_table ( order_id VARCHAR, event_time TIMESTAMP(3), proc_time AS PROCTIME() ) WITH ( 'connector.properties.0.key' = 'bootstrap.servers', 'connector.properties.0.value' = 'localhost:9092', 'connector.property-version' = '1', 'connector.startup-mode' = 'earliest-offset', 'connector.topic' = 'raw', 'connector.type' = 'kafka', 'connector.version' = '0.11', 'format.derive-schema' = 'true', 'format.property-version' = '1', 'format.type' = 'json', 'update-mode' = 'append' ) mysql dim table CREATE TABLE ilms_box_d_order ( id VARCHAR, event_timeTIMESTAMP(3)) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost/mydb', 'connector.table' = 'test', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'test', 'connector.password' = 'test', 'connector.property-version' = '1' ); DML INSERT INTO console_test SELECT t1. event_time, order_id FROM josn_table LEFT JOIN ilms_box_d_order FOR SYSTEM_TIME AS OF josn_table.proc_time AS t1 ON josn_table.order_id = t1.id and josn_table.event_time = t1.event_time; When i exec this sql, i will get the follewing exception. Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to java.sql.Timestamp, field index: 1, field value: 2020-05-22T14:00. https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java#L236 Why don't we support LocalDateTime? Best wishes. forideal