Hi, 你可以试一下用TO_TIMESTAMP(FROM_UNIXTIME(transaction_time)) 将long转为timestamp
-- Best! Xuyang 在 2023-12-13 15:36:50,"ha.fen...@aisino.com" <ha.fen...@aisino.com> 写道: >文档中数据来源于kafka >tEnv.executeSql("CREATE TABLE transactions (\n" + > " account_id BIGINT,\n" + > " amount BIGINT,\n" + > " transaction_time TIMESTAMP(3),\n" + > " WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' > SECOND\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = 'transactions',\n" + > " 'properties.bootstrap.servers' = 'kafka:9092',\n" + > " 'format' = 'csv'\n" + > ")"); > >怎么对应kafka的transaction_time中的TIMESTAMP(3)类型? > >我是用实体类型为 >private long account_id; >private int amount; >private long transaction_time; > >通过下面插入kafka > >StreamExecutionEnvironment env = >StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); > DataStream<Transactions> streamSource = env.addSource(new > TransactionSource()); > tEnv.createTemporaryView("tran",streamSource, > $("account_id"),$("amount"),$("transaction_time")); > tEnv.executeSql("CREATE TABLE transactions (\n" + > " account_id BIGINT,\n" + > " amount INT,\n" + > " transaction_time TIMESTAMP(3)\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = 'kafka_mysql_transactions',\n" + > " 'properties.bootstrap.servers' = '172.24.6.109:9092',\n" + > " 'format' = 'json'\n" + > ")"); > tEnv.executeSql("insert into transactions select * from tran "); >会报类似类型不匹配的错误 >Query schema: [account_id: BIGINT NOT NULL, amount: INT NOT NULL, >transaction_time: BIGINT NOT NULL] >Sink schema: [account_id: BIGINT, amount: INT, transaction_time: TIMESTAMP(3)]