我遇到个问题,请教一下:
环境 1.11 idea
参考的wuchong大神的demo想把client变成java的,第一个例子 统计每小时的成交量
数据可以读到但是没有输出结果,写入es没反应,后改为print sink 还是没反应
https://github.com/wuchong/flink-sql-demo/tree/v1.11-CN
求助,各位



下面是pom 和代码,以及运行结果

        // 创建执行环境
        StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
        bsEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        //设置StateBackend
        bsEnv.setStateBackend(new
FsStateBackend("file:///tmp/flink/chkdir"));
        EnvironmentSettings bsSettings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv,
bsSettings);

        // Kafka
        String sourceDDL ="CREATE TABLE user_behavior (" +
                "user_id BIGINT," +
                "item_id BIGINT," +
                "category_id BIGINT," +
                "behavior STRING," +
                "ts TIMESTAMP (3)," +
                "proctime AS PROCTIME ()," +
                "WATERMARK FOR ts AS ts-INTERVAL '5' SECOND) " +
                "WITH (" +
                "'connector'='kafka'," +
                "'topic'='user_behavior'," +
                "'scan.startup.mode'='earliest-offset'," +
                "'properties.bootstrap.servers'='localhost:9092'," +
                "'format'='json'" +
                ")";


        //写入es 改为print
/*        String sinkDDL = "CREATE TABLE buy_cnt_per_hour (" +
                "hour_of_day BIGINT," +
                "buy_cnt BIGINT" +
                ") WITH (" +
                "'connector'='elasticsearch-7'," +
                "'hosts'='http://localhost:9200'," +
                "'index'='buy_cnt_per_hour')";*/
        String sinkDDL = "CREATE TABLE buy_cnt_per_hour (\n" +
                 "hour_of_day BIGINT," +
                 "buy_cnt BIGINT" +
                ") WITH (\n" +
                " 'connector' = 'print'\n" +
                ")";


        String transformationDDL= "INSERT INTO buy_cnt_per_hour\n" +
                "SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)) as
hour_of_day , COUNT(*) as buy_cnt\n" +
                "FROM user_behavior\n" +
                "WHERE behavior = 'buy'\n" +
                "GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)";



        //注册source和sink
        tEnv.executeSql(sourceDDL);
        tEnv.executeSql(sinkDDL);
//        tableResult.print();

       tEnv.executeSql(transformationDDL);

pom
<dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
           
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
            <version>${flink.version}</version>
            
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
           
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
            <version>${flink.version}</version>

        </dependency>

        
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        

        
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>

        </dependency>


        
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>

        </dependency>

        
        <dependency>
            <groupId>org.apache.flink</groupId>
           
<artifactId>flink-connector-elasticsearch7_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        
        <dependency>
            <groupId>org.apache.flink</groupId>
           
<artifactId>flink-sql-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>



        
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>

        
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
</dependencies>

运行结果
01:15:12,358 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser  -
Kafka version: unknown
01:15:12,358 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser  -
Kafka commitId: unknown
01:15:12,358 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser  -
Kafka startTimeMs: 1597338912355
01:15:12,361 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer 
- [Consumer clientId=consumer-20, groupId=null] Subscribed to partition(s):
user_behavior-0
01:15:12,365 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState
 
- [Consumer clientId=consumer-20, groupId=null] Seeking to EARLIEST offset
of partition user_behavior-0
01:15:12,377 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata  - [Consumer
clientId=consumer-20, groupId=null] Cluster ID: txkqox8yRL6aWBNsOcS67g
01:15:12,387 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState
 
- [Consumer clientId=consumer-20, groupId=null] Resetting offset for
partition user_behavior-0 to offset 0.
01:15:12,545 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Triggering checkpoint 1 (type=CHECKPOINT) @ 1597338912539 for job
c10220b65246e8269defa48f441a7e09.
01:15:12,709 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Completed checkpoint 1 for job c10220b65246e8269defa48f441a7e09 (14080
bytes in 169 ms).
01:15:17,541 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Triggering checkpoint 2 (type=CHECKPOINT) @ 1597338917540 for job
c10220b65246e8269defa48f441a7e09.
01:15:17,553 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Completed checkpoint 2 for job c10220b65246e8269defa48f441a7e09 (14752
bytes in 11 ms).
01:15:22,546 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Triggering checkpoint 3 (type=CHECKPOINT) @ 1597338922545 for job
c10220b65246e8269defa48f441a7e09.
01:15:22,558 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Completed checkpoint 3 for job c10220b65246e8269defa48f441a7e09 (15004
bytes in 12 ms).


原始数据

3>
+I(999602,4024409,883960,cart,2017-11-27T00:07:36,2020-08-13T17:16:20.440)
3> +I(30616,1693200,4022701,pv,2017-11-27T00:07:36,2020-08-13T17:16:20.440)
3> +I(145183,3533745,1102540,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
3> +I(323010,3376212,1574064,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
3> +I(944547,2640409,2465336,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
3> +I(232939,1976318,411153,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
3>
+I(355996,5161162,1582197,buy,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
3> +I(443987,3791622,1464116,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复