下载个 flink-sql-connector-kafka 这个jar 放在lib下试下
















在 2021-01-09 02:08:12,"inza9hi" <[email protected]> 写道:
>搜了下之前的邮件,貌似没有发现和我同样的问题。
>
>lib 下的Jar   
>flink-csv-1.11.3.jar                              
>flink-table-blink_2.11-1.11.3.jar
>flink-dist_2.11-1.11.3.jar                        
>flink-table_2.11-1.11.3.jar
>flink-jdbc_2.11-1.11.3.jar                         log4j-1.2-api-2.12.1.jar
>flink-json-1.11.3.jar                              log4j-api-2.12.1.jar
>flink-shaded-zookeeper-3.4.14.jar                  log4j-core-2.12.1.jar
>flink-sql-connector-elasticsearch6_2.11-1.11.3.jar
>log4j-slf4j-impl-2.12.1.jar
>flink-sql-connector-kafka_2.11-1.11.3.jar        
>mysql-connector-java-5.1.48.jar
>
>flink bin/sql-client.sh embedded
>
>CREATE TABLE user_behavior (
>    user_id BIGINT,
>    item_id BIGINT,
>    category_id BIGINT,
>    behavior STRING,
>    ts TIMESTAMP(3),
>    proctime as PROCTIME(),  -- 通过计算列产生一个处理时间列
>    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  --
>在ts上定义watermark,ts成为事件时间列
>
>) WITH (
>    'connector' = 'kafka',  -- 使用 kafka connector
>    'topic' = 'data_test',  -- kafka topic
>    'startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
>    'properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 地址
>    'format' = 'json'  -- 数据源格式为 json
>);
>
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/

回复