注意时区哦,SQL层默认使用UTC的long值
On Thu, Oct 29, 2020 at 12:12 PM [email protected] <
[email protected]> wrote:
> 我把sink.partition-commit.trigger 设置成process-time 可以看到数据;
> 但是我后来设置source 产生出watermark 还是不行;
> val dataStream = streamEnv.addSource(new MySource)
>
> .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps[UserInfo]()
> .withTimestampAssigner(new SerializableTimestampAssigner[UserInfo] {
> override def extractTimestamp(element: UserInfo, recordTimestamp:
> Long): Long = element.getTs.getTime
> }))
> 生成的userinfo类型的ts是时间戳,所以watermark 使用的是他提取的
>
>
>
> [email protected]
>
> 发件人: Jingsong Li
> 发送时间: 2020-10-28 16:29
> 收件人: user-zh
> 主题: Re: flink hive Streaming查询不到数据的问题
> Hi,
>
> 你的Source看起来并没有产出watermark,所以:
>
> 你可以考虑使得Source产出正确的watermark,或者使用'sink.partition-commit.trigger'的默认值proc-time。
>
> Best,
> Jingsong
>
> On Wed, Oct 28, 2020 at 4:13 PM [email protected] <
> [email protected]> wrote:
>
> > 你好:
> > 我现在在使用flink 1.11.2版本 hive1.1.0 版本;
> > 当我在使用flink hive streaming的使用发现按照 示例写数据到hive
> > 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据;
> > 好像是分区信息没有提交到hive meta store;但是官网已经说实现了这个功能;我操作却不行
> > 下面是我的代码
> > object StreamMain {
> > def main(args: Array[String]): Unit = {
> > val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> > streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> > streamEnv.setParallelism(3)
> >
> > val tableEnvSettings = EnvironmentSettings.newInstance()
> > .useBlinkPlanner()
> > .inStreamingMode()
> > .build()
> >
> > val tableEnv = StreamTableEnvironment.create(streamEnv,
> tableEnvSettings)
> >
> >
>
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
> CheckpointingMode.EXACTLY_ONCE)
> >
> >
>
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
> Duration.ofSeconds(20))
> >
> > val dataStream = streamEnv.addSource(new MySource)
> >
> > val catalogName = "my_catalog"
> > val catalog = new HiveCatalog(
> > catalogName, // catalog name
> > "yutest", // default database
> >
> > "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\", // Hive
> config (hive-site.xml) directory
> > "1.1.0" // Hive version
> > )
> > tableEnv.registerCatalog(catalogName, catalog)
> > tableEnv.useCatalog(catalogName)
> >
> > tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> > tableEnv.useDatabase("yutest")
> >
> >
> > tableEnv.createTemporaryView("users", dataStream)
> > tableEnv.executeSql("DROP TABLE IF EXISTS fs_table ")
> > // 如果hive中已经存在了相应的表,则这段代码省略
> > val hiveSql = """CREATE external TABLE fs_table (
> > user_id STRING,
> > order_amount DOUBLE
> > )
> > partitioned by(
> > dt string,
> > h string,
> > m string) stored as parquet
> > TBLPROPERTIES (
> >
> > 'partition.time-extractor.timestamp-pattern'='$dt
> $h:$m:00',
> > 'sink.partition-commit.delay'='0s',
> > 'sink.partition-commit.trigger'='partition-time',
> >
> >
> 'sink.partition-commit.policy.kind'='metastore,success-file'
> > )""".stripMargin
> > tableEnv.executeSql(hiveSql)
> >
> >
> > val insertSql = "insert into fs_table SELECT userId, amount, " + "
> DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm')
> FROM users"
> > tableEnv.executeSql(insertSql)
> > }
> > }
> > public class MySource implements SourceFunction<UserInfo> {
> > private volatile boolean run = true;
> > String userids[] = {
> >
> > "4760858d-2bec-483c-a535-291de04b2247",
> "67088699-d4f4-43f2-913c-481bff8a2dc5",
> >
> > "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb",
> "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
> >
> > "aabbaa50-72f4-495c-b3a1-70383ee9d6a4",
> "3218bbb9-5874-4d37-a82d-3e35e52d1702",
> >
> > "3ebfb9602ac07779||3ebfe9612a007979",
> "aec20d52-c2eb-4436-b121-c29ad4097f6c",
> >
> > "e7e896cd939685d7||e7e8e6c1930689d7",
> "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
> > };
> >
> > @Override
> >
> > public void run(SourceFunction.SourceContext<UserInfo>
> sourceContext) throws Exception {
> >
> > while (run) {
> >
> > String userid = userids[(int) (Math.random() *
> (userids.length - 1))];
> > UserInfo userInfo = new UserInfo();
> > userInfo.setUserId(userid);
> > userInfo.setAmount(Math.random() * 100);
> > userInfo.setTs(new Timestamp(System.currentTimeMillis()));
> > sourceContext.collect(userInfo);
> > Thread.sleep(100);
> > }
> > }
> >
> > @Override
> > public void cancel() {
> > run = false;
> > }
> > }
> > public class UserInfo implements Serializable {
> > private String userId;
> > private Double amount;
> > private Timestamp ts;
> >
> > public String getUserId() {
> > return userId;
> > }
> >
> > public void setUserId(String userId) {
> > this.userId = userId;
> > }
> >
> > public Double getAmount() {
> > return amount;
> > }
> >
> > public void setAmount(Double amount) {
> > this.amount = amount;
> > }
> >
> > public Timestamp getTs() {
> > return ts;
> > }
> >
> > public void setTs(Timestamp ts) {
> > this.ts = ts;
> > }
> > }
> >
> > hive (yutest)>
> > >
> > > show partitions fs_table;
> > OK
> > partition
> > Time taken: 20.214 seconds
> >
> > ------------------------------
> > [email protected]
> >
>
>
> --
> Best, Jingsong Lee
>
--
Best, Jingsong Lee