Re: Re: flink hive Streaming查询不到数据的问题
注意时区哦,SQL层默认使用UTC的long值 On Thu, Oct 29, 2020 at 12:12 PM hdxg1101300...@163.com < hdxg1101300...@163.com> wrote: > 我把sink.partition-commit.trigger 设置成process-time 可以看到数据; > 但是我后来设置source 产生出watermark 还是不行; > val dataStream = streamEnv.addSource(new MySource) > > .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps[UserInfo]() > .withTimestampAssigner(new SerializableTimestampAssigner[UserInfo] { > override def extractTimestamp(element: UserInfo, recordTimestamp: > Long): Long = element.getTs.getTime > })) > 生成的userinfo类型的ts是时间戳,所以watermark 使用的是他提取的 > > > > hdxg1101300...@163.com > > 发件人: Jingsong Li > 发送时间: 2020-10-28 16:29 > 收件人: user-zh > 主题: Re: flink hive Streaming查询不到数据的问题 > Hi, > > 你的Source看起来并没有产出watermark,所以: > > 你可以考虑使得Source产出正确的watermark,或者使用'sink.partition-commit.trigger'的默认值proc-time。 > > Best, > Jingsong > > On Wed, Oct 28, 2020 at 4:13 PM hdxg1101300...@163.com < > hdxg1101300...@163.com> wrote: > > > 你好: > > 我现在在使用flink 1.11.2版本 hive1.1.0 版本; > > 当我在使用flink hive streaming的使用发现按照 示例写数据到hive > > 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据; > > 好像是分区信息没有提交到hive meta store;但是官网已经说实现了这个功能;我操作却不行 > > 下面是我的代码 > > object StreamMain { > > def main(args: Array[String]): Unit = { > > val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment > > streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > > streamEnv.setParallelism(3) > > > > val tableEnvSettings = EnvironmentSettings.newInstance() > > .useBlinkPlanner() > > .inStreamingMode() > > .build() > > > > val tableEnv = StreamTableEnvironment.create(streamEnv, > tableEnvSettings) > > > > > > tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > CheckpointingMode.EXACTLY_ONCE) > > > > > > tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > Duration.ofSeconds(20)) > > > > val dataStream = streamEnv.addSource(new MySource) > > > > val catalogName = "my_catalog" > > val catalog = new HiveCatalog( > > catalogName, // catalog name > > "yutest",// default database > > > > "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\", // Hive > config (hive-site.xml) directory > > "1.1.0" // Hive version > > ) > > tableEnv.registerCatalog(catalogName, catalog) > > tableEnv.useCatalog(catalogName) > > > > tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > > tableEnv.useDatabase("yutest") > > > > > > tableEnv.createTemporaryView("users", dataStream) > > tableEnv.executeSql("DROP TABLE IF EXISTS fs_table ") > > // 如果hive中已经存在了相应的表,则这段代码省略 > > val hiveSql = """CREATE external TABLE fs_table ( > > user_id STRING, > > order_amount DOUBLE > > ) > > partitioned by( > > dt string, > > h string, > > m string) stored as parquet > > TBLPROPERTIES ( > > > > 'partition.time-extractor.timestamp-pattern'='$dt > $h:$m:00', > > 'sink.partition-commit.delay'='0s', > > 'sink.partition-commit.trigger'='partition-time', > > > > > 'sink.partition-commit.policy.kind'='metastore,success-file' > > )""".stripMargin > > tableEnv.executeSql(hiveSql) > > > > > > val insertSql = "insert into fs_table SELECT userId, amount, " + " > DATE_FORMAT(ts, '-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') > FROM users" > > tableEnv.executeSql(insertSql) > > } > > } > > public class MySource implements SourceFunction { > > private volatile boolean run = true; > > String userids[] = { > > > > "4760858d-2bec-483c-a535-291de04b2247", > "67088699-d4f4-43f2-913c-481bff8a2dc5", > > > > "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", > "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b", > > > > "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", > "3218bbb9-5874-4d37-a82d-3e35e52d1702", > > > > "3ebfb9602ac07779||3ebfe9612a007979
Re: Re: flink hive Streaming查询不到数据的问题
我把sink.partition-commit.trigger 设置成process-time 可以看到数据; 但是我后来设置source 产生出watermark 还是不行; val dataStream = streamEnv.addSource(new MySource) .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps[UserInfo]() .withTimestampAssigner(new SerializableTimestampAssigner[UserInfo] { override def extractTimestamp(element: UserInfo, recordTimestamp: Long): Long = element.getTs.getTime })) 生成的userinfo类型的ts是时间戳,所以watermark 使用的是他提取的 hdxg1101300...@163.com 发件人: Jingsong Li 发送时间: 2020-10-28 16:29 收件人: user-zh 主题: Re: flink hive Streaming查询不到数据的问题 Hi, 你的Source看起来并没有产出watermark,所以: 你可以考虑使得Source产出正确的watermark,或者使用'sink.partition-commit.trigger'的默认值proc-time。 Best, Jingsong On Wed, Oct 28, 2020 at 4:13 PM hdxg1101300...@163.com < hdxg1101300...@163.com> wrote: > 你好: > 我现在在使用flink 1.11.2版本 hive1.1.0 版本; > 当我在使用flink hive streaming的使用发现按照 示例写数据到hive > 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据; > 好像是分区信息没有提交到hive meta store;但是官网已经说实现了这个功能;我操作却不行 > 下面是我的代码 > object StreamMain { > def main(args: Array[String]): Unit = { > val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment > streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > streamEnv.setParallelism(3) > > val tableEnvSettings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build() > > val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings) > > > tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > CheckpointingMode.EXACTLY_ONCE) > > > tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > Duration.ofSeconds(20)) > > val dataStream = streamEnv.addSource(new MySource) > > val catalogName = "my_catalog" > val catalog = new HiveCatalog( > catalogName, // catalog name > "yutest",// default database > > "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\", // Hive > config (hive-site.xml) directory > "1.1.0" // Hive version > ) > tableEnv.registerCatalog(catalogName, catalog) > tableEnv.useCatalog(catalogName) > > tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > tableEnv.useDatabase("yutest") > > > tableEnv.createTemporaryView("users", dataStream) > tableEnv.executeSql("DROP TABLE IF EXISTS fs_table ") > // 如果hive中已经存在了相应的表,则这段代码省略 > val hiveSql = """CREATE external TABLE fs_table ( > user_id STRING, > order_amount DOUBLE > ) > partitioned by( > dt string, > h string, > m string) stored as parquet > TBLPROPERTIES ( > > 'partition.time-extractor.timestamp-pattern'='$dt > $h:$m:00', > 'sink.partition-commit.delay'='0s', > 'sink.partition-commit.trigger'='partition-time', > > > 'sink.partition-commit.policy.kind'='metastore,success-file' > )""".stripMargin > tableEnv.executeSql(hiveSql) > > > val insertSql = "insert into fs_table SELECT userId, amount, " + " > DATE_FORMAT(ts, '-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') > FROM users" > tableEnv.executeSql(insertSql) > } > } > public class MySource implements SourceFunction { > private volatile boolean run = true; > String userids[] = { > > "4760858d-2bec-483c-a535-291de04b2247", > "67088699-d4f4-43f2-913c-481bff8a2dc5", > > "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", > "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b", > > "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", > "3218bbb9-5874-4d37-a82d-3e35e52d1702", > > "3ebfb9602ac07779||3ebfe9612a007979", > "aec20d52-c2eb-4436-b121-c29ad4097f6c", > > "e7e896cd939685d7||e7e8e6c1930689d7", > "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee" > }; > > @Override > > public void run(SourceFunction.SourceContext sourceContext) > throws Exception { > > while (run) { > > String userid = userids[(int) (Math.random() * (userids.length - > 1))]; > UserInfo userInfo = new UserInfo(); > userInfo.setUserId(userid); > userInfo.setAmount(Math.random() * 100); >
Re: flink hive Streaming查询不到数据的问题
Hi, 你的Source看起来并没有产出watermark,所以: 你可以考虑使得Source产出正确的watermark,或者使用'sink.partition-commit.trigger'的默认值proc-time。 Best, Jingsong On Wed, Oct 28, 2020 at 4:13 PM hdxg1101300...@163.com < hdxg1101300...@163.com> wrote: > 你好: > 我现在在使用flink 1.11.2版本 hive1.1.0 版本; > 当我在使用flink hive streaming的使用发现按照 示例写数据到hive > 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据; > 好像是分区信息没有提交到hive meta store;但是官网已经说实现了这个功能;我操作却不行 > 下面是我的代码 > object StreamMain { > def main(args: Array[String]): Unit = { > val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment > streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > streamEnv.setParallelism(3) > > val tableEnvSettings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build() > > val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings) > > > tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > CheckpointingMode.EXACTLY_ONCE) > > > tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > Duration.ofSeconds(20)) > > val dataStream = streamEnv.addSource(new MySource) > > val catalogName = "my_catalog" > val catalog = new HiveCatalog( > catalogName, // catalog name > "yutest",// default database > > "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\", // Hive > config (hive-site.xml) directory > "1.1.0" // Hive version > ) > tableEnv.registerCatalog(catalogName, catalog) > tableEnv.useCatalog(catalogName) > > tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > tableEnv.useDatabase("yutest") > > > tableEnv.createTemporaryView("users", dataStream) > tableEnv.executeSql("DROP TABLE IF EXISTS fs_table ") > // 如果hive中已经存在了相应的表,则这段代码省略 > val hiveSql = """CREATE external TABLE fs_table ( > user_id STRING, > order_amount DOUBLE > ) > partitioned by( > dt string, > h string, > m string) stored as parquet > TBLPROPERTIES ( > > 'partition.time-extractor.timestamp-pattern'='$dt > $h:$m:00', > 'sink.partition-commit.delay'='0s', > 'sink.partition-commit.trigger'='partition-time', > > > 'sink.partition-commit.policy.kind'='metastore,success-file' > )""".stripMargin > tableEnv.executeSql(hiveSql) > > > val insertSql = "insert into fs_table SELECT userId, amount, " + " > DATE_FORMAT(ts, '-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') > FROM users" > tableEnv.executeSql(insertSql) > } > } > public class MySource implements SourceFunction { > private volatile boolean run = true; > String userids[] = { > > "4760858d-2bec-483c-a535-291de04b2247", > "67088699-d4f4-43f2-913c-481bff8a2dc5", > > "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", > "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b", > > "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", > "3218bbb9-5874-4d37-a82d-3e35e52d1702", > > "3ebfb9602ac07779||3ebfe9612a007979", > "aec20d52-c2eb-4436-b121-c29ad4097f6c", > > "e7e896cd939685d7||e7e8e6c1930689d7", > "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee" > }; > > @Override > > public void run(SourceFunction.SourceContext sourceContext) > throws Exception { > > while (run) { > > String userid = userids[(int) (Math.random() * (userids.length - > 1))]; > UserInfo userInfo = new UserInfo(); > userInfo.setUserId(userid); > userInfo.setAmount(Math.random() * 100); > userInfo.setTs(new Timestamp(System.currentTimeMillis())); > sourceContext.collect(userInfo); > Thread.sleep(100); > } > } > > @Override > public void cancel() { > run = false; > } > } > public class UserInfo implements Serializable { > private String userId; > private Double amount; > private Timestamp ts; > > public String getUserId() { > return userId; > } > > public void setUserId(String userId) { > this.userId = userId; > } > > public Double getAmount() { > return amount; > } > > public void setAmount(Double amount) { > this.amount = amount; > } > > public Timestamp getTs() { > return ts; > } > > public void setTs(Timestamp ts) { > this.ts = ts; > } > } > > hive (yutest)> > > > > show partitions fs_table; > OK > partition > Time taken: 20.214 seconds > > -- > hdxg1101300...@163.com > -- Best, Jingsong Lee
flink hive Streaming查询不到数据的问题
你好: 我现在在使用flink 1.11.2版本 hive1.1.0 版本; 当我在使用flink hive streaming的使用发现按照 示例写数据到hive 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据; 好像是分区信息没有提交到hive meta store;但是官网已经说实现了这个功能;我操作却不行 下面是我的代码 object StreamMain { def main(args: Array[String]): Unit = { val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) streamEnv.setParallelism(3) val tableEnvSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings) tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE) tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20)) val dataStream = streamEnv.addSource(new MySource) val catalogName = "my_catalog" val catalog = new HiveCatalog( catalogName, // catalog name "yutest",// default database "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\", // Hive config (hive-site.xml) directory "1.1.0" // Hive version ) tableEnv.registerCatalog(catalogName, catalog) tableEnv.useCatalog(catalogName) tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) tableEnv.useDatabase("yutest") tableEnv.createTemporaryView("users", dataStream) tableEnv.executeSql("DROP TABLE IF EXISTS fs_table ") // 如果hive中已经存在了相应的表,则这段代码省略 val hiveSql = """CREATE external TABLE fs_table ( user_id STRING, order_amount DOUBLE ) partitioned by( dt string, h string, m string) stored as parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00', 'sink.partition-commit.delay'='0s', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.policy.kind'='metastore,success-file' )""".stripMargin tableEnv.executeSql(hiveSql) val insertSql = "insert into fs_table SELECT userId, amount, " + " DATE_FORMAT(ts, '-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users" tableEnv.executeSql(insertSql) } } public class MySource implements SourceFunction { private volatile boolean run = true; String userids[] = { "4760858d-2bec-483c-a535-291de04b2247", "67088699-d4f4-43f2-913c-481bff8a2dc5", "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b", "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", "3218bbb9-5874-4d37-a82d-3e35e52d1702", "3ebfb9602ac07779||3ebfe9612a007979", "aec20d52-c2eb-4436-b121-c29ad4097f6c", "e7e896cd939685d7||e7e8e6c1930689d7", "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee" }; @Override public void run(SourceFunction.SourceContext sourceContext) throws Exception { while (run) { String userid = userids[(int) (Math.random() * (userids.length - 1))]; UserInfo userInfo = new UserInfo(); userInfo.setUserId(userid); userInfo.setAmount(Math.random() * 100); userInfo.setTs(new Timestamp(System.currentTimeMillis())); sourceContext.collect(userInfo); Thread.sleep(100); } } @Override public void cancel() { run = false; } } public class UserInfo implements Serializable { private String userId; private Double amount; private Timestamp ts; public String getUserId() { return userId; } public void setUserId(String userId) { this.userId = userId; } public Double getAmount() { return amount; } public void setAmount(Double amount) { this.amount = amount; } public Timestamp getTs() { return ts; } public void setTs(Timestamp ts) { this.ts = ts; } } hive (yutest)> > > show partitions fs_table; OK partition Time taken: 20.214 seconds hdxg1101300...@163.com