Hi, 
    你可以试一下用TO_TIMESTAMP(FROM_UNIXTIME(transaction_time)) 将long转为timestamp




--

    Best!
    Xuyang





在 2023-12-13 15:36:50,"ha.fen...@aisino.com" <ha.fen...@aisino.com> 写道:
>文档中数据来源于kafka
>tEnv.executeSql("CREATE TABLE transactions (\n" +
>    "    account_id  BIGINT,\n" +
>    "    amount      BIGINT,\n" +
>    "    transaction_time TIMESTAMP(3),\n" +
>    "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' 
> SECOND\n" +
>    ") WITH (\n" +
>    "    'connector' = 'kafka',\n" +
>    "    'topic'     = 'transactions',\n" +
>    "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
>    "    'format'    = 'csv'\n" +
>    ")");
>
>怎么对应kafka的transaction_time中的TIMESTAMP(3)类型?
>
>我是用实体类型为
>private long account_id;
>private int amount;
>private long transaction_time;
>
>通过下面插入kafka
>
>StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>        DataStream<Transactions> streamSource = env.addSource(new 
> TransactionSource());
>        tEnv.createTemporaryView("tran",streamSource, 
> $("account_id"),$("amount"),$("transaction_time"));
>        tEnv.executeSql("CREATE TABLE transactions (\n" +
>                "    account_id  BIGINT,\n" +
>                "    amount      INT,\n" +
>                "    transaction_time TIMESTAMP(3)\n" +
>                ") WITH (\n" +
>                "    'connector' = 'kafka',\n" +
>                "    'topic'     = 'kafka_mysql_transactions',\n" +
>                "    'properties.bootstrap.servers' = '172.24.6.109:9092',\n" +
>                "    'format'    = 'json'\n" +
>                ")");
>        tEnv.executeSql("insert into transactions select * from tran ");
>会报类似类型不匹配的错误
>Query schema: [account_id: BIGINT NOT NULL, amount: INT NOT NULL, 
>transaction_time: BIGINT NOT NULL]
>Sink schema:  [account_id: BIGINT, amount: INT, transaction_time: TIMESTAMP(3)]

回复