DataStream<DataSource> skuDataStream = stream.map(
                (MapFunction<String, DataSource>) s -> {
                    DataSource ret = JSONObject.parseObject(s, 
DataSource.class);
                    ret.setEvent_ts(dateTime.parse(ret.getLog_creation_time(), 
pattern).getMillis());

                    return ret;
                }
        ).filter((FilterFunction<DataSource>) DataSource -> 
DataSource.getregion_id() > 0
                && DataSource.getprovince_id() > 0
                && DataSource.getCity_id() > 0
                && DataSource.getDistrict_id() > 0
                && DataSource.getCode() > 0
                && DataSource.getCategory1_id() > 0
                && DataSource.getCategory2_id() > 0
                && DataSource.getCategory3_id() > 0 && DataSource.getUnion_id() 
!= null);

        Schema schema = Schema.newBuilder()
                .column("code", "bigint")
                .column("flow_source", "string")
                .column("origin_sku_id","bigint")
                .column("sku_id","bigint")
                .column("region_id","bigint")
                .column("province_id","bigint")
                .column("city_id","bigint")
                .column("district_id","bigint")
                .column("category1_id","bigint")
                .column("category2_id","bigint")
                .column("category3_id","bigint")
                .column("union_id", "string")
                .column("channel1_id", "string")
                .column("channel2_id", "string")
                .columnByExpression("row_ltz", 
Expressions.callSql("TO_TIMESTAMP_LTZ(event_ts, 3)"))
                .watermark("row_ltz", "row_ltz - INTERVAL '10' SECOND")
                .build();
        Table inputTable = tableEnv.fromDataStream(skuDataStream, schema);

        tableEnv.createTemporaryView("source_table", inputTable);

        String sink = "CREATE TABLE sink_table (\n" +
                "    window_start string,\n" +
                "    window_end   string,\n" +
                "    region_id      bigint,\n" +
                "    province_id      bigint,\n" +
                "    city_id      bigint,\n" +
                "    district_id      bigint,\n" +
                "    code      bigint,\n" +
                "    category1_id      bigint,\n" +
                "    category2_id      bigint,\n" +
                "    category3_id      bigint,\n" +
                "    uv      bigint,\n" +
                "    uv1      bigint,\n" +
                "    uv2      bigint,\n" +
                "    uv3      bigint,\n" +
                "    uv4      bigint,\n" +
                "    uv5      bigint,\n" +
                "    uv6      bigint\n" +
                ") WITH (\n" +
                "  'connector' = 'print'\n" +
                ")";

        String sql =
                "insert into sink_table \n" +
                        "SELECT  \n" +
                        "cast(window_start as string) as window_start, \n" +
                        "cast(window_end as string) as window_end, \n" +
                        "region_id, \n" +
                        "province_id, \n" +
                        "city_id, \n" +
                        "district_id, \n" +
                        "code, \n" +
                        "category1_id, \n" +
                        "category2_id, \n" +
                        "category3_id, \n" +
                        "count(distinct union_id) as uv, \n" +
                        "COUNT(DISTINCT union_id) FILTER (WHERE channel1_id in 
('1', '2')) AS uv1, \n" +
                        "COUNT(DISTINCT union_id) FILTER (WHERE channel1_id 
='1') AS uv2, \n" +
                        "COUNT(DISTINCT union_id) FILTER (WHERE channel1_id 
='3') AS uv3, \n" +
                        "COUNT(DISTINCT union_id) FILTER (WHERE channel1_id 
='1' and channel2_id='1') AS uv4, \n" +
                        "COUNT(DISTINCT union_id) FILTER (WHERE channel1_id 
='3' and channel2_id='1') AS uv5, \n" +
                        "COUNT(DISTINCT union_id) FILTER (WHERE channel1_id 
='1' and origin_sku_id = sku_id) AS uv6 \n" +
                        "FROM TABLE(CUMULATE(TABLE source_table, 
DESCRIPTOR(row_ltz), INTERVAL '10' MINUTES, INTERVAL '1' DAY)) \n" +
                        //"GROUP BY window_start, 
window_end,code,region_id,province_id,city_id,district_id,category1_id,category2_id,category3_id";
                        "GROUP BY window_start, window_end," +
                        "GROUPING SETS (\n" +
                        
"(code,region_id,province_id,city_id,district_id,category1_id,category2_id,category3_id),\n"
 +
                        
"(code,region_id,province_id,city_id,district_id,category1_id,category2_id),\n" 
+
                        
"(code,region_id,province_id,city_id,district_id,category1_id),\n" +
                        "\n" +
                        
"(code,region_id,province_id,city_id,category1_id,category2_id,category3_id),\n"
 +
                        
"(code,region_id,province_id,category1_id,category2_id,category3_id),\n" +
                        
"(code,region_id,category1_id,category2_id,category3_id),\n" +
                        "\n" +
                        "(code,region_id,province_id,city_id,district_id),\n" +
                        "(code,region_id,province_id,city_id),\n" +
                        "(code,region_id,province_id),\n" +
                        "(code,region_id),\n" +
                        "\n" +
                        "(code,category1_id,category2_id,category3_id),\n" +
                        "(code,category1_id,category2_id),\n" +
                        "(code,category1_id),\n" +
                        "\n" +
                        "(code),\n" +
                        "\n" +
                        "\n" +
                        
"(region_id,province_id,city_id,district_id,category1_id,category2_id,category3_id),\n"
 +
                        
"(region_id,province_id,city_id,district_id,category1_id,category2_id),\n" +
                        
"(region_id,province_id,city_id,district_id,category1_id),\n" +
                        "\n" +
                        
"(region_id,province_id,city_id,category1_id,category2_id,category3_id),\n" +
                        
"(region_id,province_id,category1_id,category2_id,category3_id),\n" +
                        "(region_id,category1_id,category2_id,category3_id),\n" 
+
                        "\n" +
                        "(region_id,province_id,city_id,district_id),\n" +
                        "(region_id,province_id,city_id),\n" +
                        "(region_id,province_id),\n" +
                        "(region_id),\n" +
                        "\n" +
                        "(category1_id,category2_id,category3_id),\n" +
                        "(category1_id,category2_id),\n" +
                        "(category1_id),\n" +
                        "()\n" +
                        ");";
        tableEnv.executeSql(sink);
        tableEnv.executeSql(sql);
        

}

回复