DataStream<DataSource> skuDataStream = stream.map( (MapFunction<String, DataSource>) s -> { DataSource ret = JSONObject.parseObject(s, DataSource.class); ret.setEvent_ts(dateTime.parse(ret.getLog_creation_time(), pattern).getMillis());
return ret; } ).filter((FilterFunction<DataSource>) DataSource -> DataSource.getregion_id() > 0 && DataSource.getprovince_id() > 0 && DataSource.getCity_id() > 0 && DataSource.getDistrict_id() > 0 && DataSource.getCode() > 0 && DataSource.getCategory1_id() > 0 && DataSource.getCategory2_id() > 0 && DataSource.getCategory3_id() > 0 && DataSource.getUnion_id() != null); Schema schema = Schema.newBuilder() .column("code", "bigint") .column("flow_source", "string") .column("origin_sku_id","bigint") .column("sku_id","bigint") .column("region_id","bigint") .column("province_id","bigint") .column("city_id","bigint") .column("district_id","bigint") .column("category1_id","bigint") .column("category2_id","bigint") .column("category3_id","bigint") .column("union_id", "string") .column("channel1_id", "string") .column("channel2_id", "string") .columnByExpression("row_ltz", Expressions.callSql("TO_TIMESTAMP_LTZ(event_ts, 3)")) .watermark("row_ltz", "row_ltz - INTERVAL '10' SECOND") .build(); Table inputTable = tableEnv.fromDataStream(skuDataStream, schema); tableEnv.createTemporaryView("source_table", inputTable); String sink = "CREATE TABLE sink_table (\n" + " window_start string,\n" + " window_end string,\n" + " region_id bigint,\n" + " province_id bigint,\n" + " city_id bigint,\n" + " district_id bigint,\n" + " code bigint,\n" + " category1_id bigint,\n" + " category2_id bigint,\n" + " category3_id bigint,\n" + " uv bigint,\n" + " uv1 bigint,\n" + " uv2 bigint,\n" + " uv3 bigint,\n" + " uv4 bigint,\n" + " uv5 bigint,\n" + " uv6 bigint\n" + ") WITH (\n" + " 'connector' = 'print'\n" + ")"; String sql = "insert into sink_table \n" + "SELECT \n" + "cast(window_start as string) as window_start, \n" + "cast(window_end as string) as window_end, \n" + "region_id, \n" + "province_id, \n" + "city_id, \n" + "district_id, \n" + "code, \n" + "category1_id, \n" + "category2_id, \n" + "category3_id, \n" + "count(distinct union_id) as uv, \n" + "COUNT(DISTINCT union_id) FILTER (WHERE channel1_id in ('1', '2')) AS uv1, \n" + "COUNT(DISTINCT union_id) FILTER (WHERE channel1_id ='1') AS uv2, \n" + "COUNT(DISTINCT union_id) FILTER (WHERE channel1_id ='3') AS uv3, \n" + "COUNT(DISTINCT union_id) FILTER (WHERE channel1_id ='1' and channel2_id='1') AS uv4, \n" + "COUNT(DISTINCT union_id) FILTER (WHERE channel1_id ='3' and channel2_id='1') AS uv5, \n" + "COUNT(DISTINCT union_id) FILTER (WHERE channel1_id ='1' and origin_sku_id = sku_id) AS uv6 \n" + "FROM TABLE(CUMULATE(TABLE source_table, DESCRIPTOR(row_ltz), INTERVAL '10' MINUTES, INTERVAL '1' DAY)) \n" + //"GROUP BY window_start, window_end,code,region_id,province_id,city_id,district_id,category1_id,category2_id,category3_id"; "GROUP BY window_start, window_end," + "GROUPING SETS (\n" + "(code,region_id,province_id,city_id,district_id,category1_id,category2_id,category3_id),\n" + "(code,region_id,province_id,city_id,district_id,category1_id,category2_id),\n" + "(code,region_id,province_id,city_id,district_id,category1_id),\n" + "\n" + "(code,region_id,province_id,city_id,category1_id,category2_id,category3_id),\n" + "(code,region_id,province_id,category1_id,category2_id,category3_id),\n" + "(code,region_id,category1_id,category2_id,category3_id),\n" + "\n" + "(code,region_id,province_id,city_id,district_id),\n" + "(code,region_id,province_id,city_id),\n" + "(code,region_id,province_id),\n" + "(code,region_id),\n" + "\n" + "(code,category1_id,category2_id,category3_id),\n" + "(code,category1_id,category2_id),\n" + "(code,category1_id),\n" + "\n" + "(code),\n" + "\n" + "\n" + "(region_id,province_id,city_id,district_id,category1_id,category2_id,category3_id),\n" + "(region_id,province_id,city_id,district_id,category1_id,category2_id),\n" + "(region_id,province_id,city_id,district_id,category1_id),\n" + "\n" + "(region_id,province_id,city_id,category1_id,category2_id,category3_id),\n" + "(region_id,province_id,category1_id,category2_id,category3_id),\n" + "(region_id,category1_id,category2_id,category3_id),\n" + "\n" + "(region_id,province_id,city_id,district_id),\n" + "(region_id,province_id,city_id),\n" + "(region_id,province_id),\n" + "(region_id),\n" + "\n" + "(category1_id,category2_id,category3_id),\n" + "(category1_id,category2_id),\n" + "(category1_id),\n" + "()\n" + ");"; tableEnv.executeSql(sink); tableEnv.executeSql(sql); }