各位老师好,我最近在做Flink查询Hive的功能,需要用到窗口处理数据,在编写代码过程中无法设置水印,我看官网看到Table API & SQL 设置事件时间有三种方式: 1、在 DDL 中定义 2、在 DataStream 到 Table 转换时定义 3、使用 TableSource 定义 而我使用的是HiveCatalog查询hive,貌似用不上以上三种方式。所以我想问问各位老师,有没有一种办法可以直接在Table上设置某个字段为事件事件,并且设置水印? 另外说明,我的第一版代码是将Table转换为DataSteam,然后再设置水印和窗口,但是执行转换过程非常耗时,并且在源码中 toDataSteam()方法的注释上也说“表生态系统的类型系统比DataStream API的类型系统更丰富”,因此开始考虑使用Table或SQL解决问题。 以下是我的第一版代码 // flink 集成 hive System.out.println("初始化Flink环境"); String hiveVersion = "3.1.2"; String catalogName = "myhive"; String defaultDatabase = "dwd_1580_egd_finishing_mill_lv1_202302"; String hiveConfDir = "/usr/hdp/3.1.4.0-315/apache-hive-3.1.2-bin/conf"; EnvironmentSettings settings = EnvironmentSettings .newInstance() .inStreamingMode() .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
System.out.println("定义hive环境"); // 定义 hive catalog 参数:catalog名称、数据库名称、对象名称 HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir, hiveVersion); tableEnv.registerCatalog(catalogName, hive); // 将 HiveCatalog 设置为 session 的当前 catalog tableEnv.useCatalog(catalogName); tableEnv.useDatabase(defaultDatabase); // 设置 hive 并行度 Configuration configuration = tableEnv.getConfig().getConfiguration(); configuration.setInteger("table.exec.hive.infer-source-parallelism.max", sourceParallelism); // Default 1000 // 使用 HiveTableSource System.out.println("定义查询条件"); // 定义查询条件 Table table = tableEnv .from(catalogName + "." + databaseName + "." + tableName) .select(DATETIME + "," + fields + "," + YEAR + "," + MONTH + "," + DAY + "," + HOUR) .filter($(YEAR).isEqual(year)) .filter($(MONTH).isEqual(startMonth)) .filter($(DAY).isGreaterOrEqual(startDay)) .filter($(HOUR).isGreaterOrEqual(startHour)) .filter($(DAY).isLessOrEqual(endDay)) .filter($(HOUR).isLessOrEqual(endHour)); tableEnv.createTemporaryView("myTable", table); // Table 转 Stream,非常耗时 System.out.println("Table to Stream"); DataStream<Row> resultStream = tableEnv.toDataStream(table); // 水印及窗口设置 System.out.println("水印及窗口"); resultStream .assignTimestampsAndWatermarks(WatermarkStrategy.<Row>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((SerializableTimestampAssigner<Row>) (element, recordTimestamp) -> { long datetime = 0; try { datetime = new SimpleDateFormat(DATEFORMAT) .parse(element.getFieldAs(DATETIME).toString()) .getTime(); } catch (ParseException e) { e.printStackTrace(); } return datetime; })) .windowAll(TumblingEventTimeWindows.of(Time.seconds(windowTime))) | | ZhaoShuKang | | chuckzha...@163.com |