Re: Re: flink hive Streaming查询不到数据的问题

2020-10-28 文章 Jingsong Li
注意时区哦,SQL层默认使用UTC的long值

On Thu, Oct 29, 2020 at 12:12 PM hdxg1101300...@163.com <
hdxg1101300...@163.com> wrote:

> 我把sink.partition-commit.trigger 设置成process-time 可以看到数据;
> 但是我后来设置source 产生出watermark 还是不行;
> val dataStream = streamEnv.addSource(new MySource)
>
> .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps[UserInfo]()
>   .withTimestampAssigner(new SerializableTimestampAssigner[UserInfo] {
> override def extractTimestamp(element: UserInfo, recordTimestamp:
> Long): Long = element.getTs.getTime
>   }))
> 生成的userinfo类型的ts是时间戳,所以watermark 使用的是他提取的
>
>
>
> hdxg1101300...@163.com
>
> 发件人: Jingsong Li
> 发送时间: 2020-10-28 16:29
> 收件人: user-zh
> 主题: Re: flink hive Streaming查询不到数据的问题
> Hi,
>
> 你的Source看起来并没有产出watermark,所以:
>
> 你可以考虑使得Source产出正确的watermark,或者使用'sink.partition-commit.trigger'的默认值proc-time。
>
> Best,
> Jingsong
>
> On Wed, Oct 28, 2020 at 4:13 PM hdxg1101300...@163.com <
> hdxg1101300...@163.com> wrote:
>
> > 你好:
> > 我现在在使用flink 1.11.2版本 hive1.1.0 版本;
> > 当我在使用flink hive streaming的使用发现按照 示例写数据到hive
> > 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据;
> > 好像是分区信息没有提交到hive  meta store;但是官网已经说实现了这个功能;我操作却不行
> > 下面是我的代码
> >  object StreamMain {
> >   def main(args: Array[String]): Unit = {
> > val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> > streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> > streamEnv.setParallelism(3)
> >
> > val tableEnvSettings = EnvironmentSettings.newInstance()
> >   .useBlinkPlanner()
> >   .inStreamingMode()
> >   .build()
> >
> > val tableEnv = StreamTableEnvironment.create(streamEnv,
> tableEnvSettings)
> >
> >
>  
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
> CheckpointingMode.EXACTLY_ONCE)
> >
> >
>  
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
> Duration.ofSeconds(20))
> >
> > val dataStream = streamEnv.addSource(new MySource)
> >
> > val catalogName = "my_catalog"
> > val catalog = new HiveCatalog(
> >   catalogName,  // catalog name
> >   "yutest",// default database
> >
> >   "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\",  // Hive
> config (hive-site.xml) directory
> >   "1.1.0"   // Hive version
> > )
> > tableEnv.registerCatalog(catalogName, catalog)
> > tableEnv.useCatalog(catalogName)
> >
> > tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> > tableEnv.useDatabase("yutest")
> >
> >
> > tableEnv.createTemporaryView("users", dataStream)
> > tableEnv.executeSql("DROP TABLE IF EXISTS  fs_table ")
> > //  如果hive中已经存在了相应的表,则这段代码省略
> > val hiveSql = """CREATE external TABLE fs_table (
> > user_id STRING,
> > order_amount DOUBLE
> >   )
> >   partitioned by(
> >   dt string,
> >   h string,
> >   m string) stored as parquet
> >   TBLPROPERTIES (
> >
> > 'partition.time-extractor.timestamp-pattern'='$dt
> $h:$m:00',
> > 'sink.partition-commit.delay'='0s',
> > 'sink.partition-commit.trigger'='partition-time',
> >
> >
>  'sink.partition-commit.policy.kind'='metastore,success-file'
> >   )""".stripMargin
> > tableEnv.executeSql(hiveSql)
> >
> >
> > val insertSql = "insert into  fs_table SELECT userId, amount, " + "
> DATE_FORMAT(ts, '-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm')
> FROM users"
> > tableEnv.executeSql(insertSql)
> >   }
> > }
> > public class MySource implements SourceFunction {
> > private volatile boolean run = true;
> > String userids[] = {
> >
> > "4760858d-2bec-483c-a535-291de04b2247",
> "67088699-d4f4-43f2-913c-481bff8a2dc5",
> >
> > "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb",
> "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
> >
> > "aabbaa50-72f4-495c-b3a1-70383ee9d6a4",
> "3218bbb9-5874-4d37-a82d-3e35e52d1702",
> >
> > "3ebfb9602ac07779||3ebfe9612a007979

Re: Re: flink hive Streaming查询不到数据的问题

2020-10-28 文章 hdxg1101300...@163.com
我把sink.partition-commit.trigger 设置成process-time 可以看到数据;
但是我后来设置source 产生出watermark 还是不行;
val dataStream = streamEnv.addSource(new MySource)
  
.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps[UserInfo]()
  .withTimestampAssigner(new SerializableTimestampAssigner[UserInfo] {
override def extractTimestamp(element: UserInfo, recordTimestamp: 
Long): Long = element.getTs.getTime
  }))
生成的userinfo类型的ts是时间戳,所以watermark 使用的是他提取的



hdxg1101300...@163.com
 
发件人: Jingsong Li
发送时间: 2020-10-28 16:29
收件人: user-zh
主题: Re: flink hive Streaming查询不到数据的问题
Hi,
 
你的Source看起来并没有产出watermark,所以:
你可以考虑使得Source产出正确的watermark,或者使用'sink.partition-commit.trigger'的默认值proc-time。
 
Best,
Jingsong
 
On Wed, Oct 28, 2020 at 4:13 PM hdxg1101300...@163.com <
hdxg1101300...@163.com> wrote:
 
> 你好:
> 我现在在使用flink 1.11.2版本 hive1.1.0 版本;
> 当我在使用flink hive streaming的使用发现按照 示例写数据到hive
> 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据;
> 好像是分区信息没有提交到hive  meta store;但是官网已经说实现了这个功能;我操作却不行
> 下面是我的代码
>  object StreamMain {
>   def main(args: Array[String]): Unit = {
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> streamEnv.setParallelism(3)
>
> val tableEnvSettings = EnvironmentSettings.newInstance()
>   .useBlinkPlanner()
>   .inStreamingMode()
>   .build()
>
> val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)
>
> 
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
>  CheckpointingMode.EXACTLY_ONCE)
>
> 
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
>  Duration.ofSeconds(20))
>
> val dataStream = streamEnv.addSource(new MySource)
>
> val catalogName = "my_catalog"
> val catalog = new HiveCatalog(
>   catalogName,  // catalog name
>   "yutest",// default database
>
>   "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\",  // Hive 
> config (hive-site.xml) directory
>   "1.1.0"   // Hive version
> )
> tableEnv.registerCatalog(catalogName, catalog)
> tableEnv.useCatalog(catalogName)
>
> tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> tableEnv.useDatabase("yutest")
>
>
> tableEnv.createTemporaryView("users", dataStream)
> tableEnv.executeSql("DROP TABLE IF EXISTS  fs_table ")
> //  如果hive中已经存在了相应的表,则这段代码省略
> val hiveSql = """CREATE external TABLE fs_table (
> user_id STRING,
> order_amount DOUBLE
>   )
>   partitioned by(
>   dt string,
>   h string,
>   m string) stored as parquet
>   TBLPROPERTIES (
>
> 'partition.time-extractor.timestamp-pattern'='$dt 
> $h:$m:00',
> 'sink.partition-commit.delay'='0s',
> 'sink.partition-commit.trigger'='partition-time',
>
> 
> 'sink.partition-commit.policy.kind'='metastore,success-file'
>   )""".stripMargin
> tableEnv.executeSql(hiveSql)
>
>
> val insertSql = "insert into  fs_table SELECT userId, amount, " + " 
> DATE_FORMAT(ts, '-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') 
> FROM users"
> tableEnv.executeSql(insertSql)
>   }
> }
> public class MySource implements SourceFunction {
> private volatile boolean run = true;
> String userids[] = {
>
> "4760858d-2bec-483c-a535-291de04b2247", 
> "67088699-d4f4-43f2-913c-481bff8a2dc5",
>
> "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", 
> "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
>
> "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", 
> "3218bbb9-5874-4d37-a82d-3e35e52d1702",
>
> "3ebfb9602ac07779||3ebfe9612a007979", 
> "aec20d52-c2eb-4436-b121-c29ad4097f6c",
>
> "e7e896cd939685d7||e7e8e6c1930689d7", 
> "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
> };
>
> @Override
>
> public void run(SourceFunction.SourceContext sourceContext) 
> throws Exception {
>
> while (run) {
>
> String userid = userids[(int) (Math.random() * (userids.length - 
> 1))];
> UserInfo userInfo = new UserInfo();
> userInfo.setUserId(userid);
> userInfo.setAmount(Math.random() * 100);
>  

Re: flink hive Streaming查询不到数据的问题

2020-10-28 文章 Jingsong Li
Hi,

你的Source看起来并没有产出watermark,所以:
你可以考虑使得Source产出正确的watermark,或者使用'sink.partition-commit.trigger'的默认值proc-time。

Best,
Jingsong

On Wed, Oct 28, 2020 at 4:13 PM hdxg1101300...@163.com <
hdxg1101300...@163.com> wrote:

> 你好:
> 我现在在使用flink 1.11.2版本 hive1.1.0 版本;
> 当我在使用flink hive streaming的使用发现按照 示例写数据到hive
> 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据;
> 好像是分区信息没有提交到hive  meta store;但是官网已经说实现了这个功能;我操作却不行
> 下面是我的代码
>  object StreamMain {
>   def main(args: Array[String]): Unit = {
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> streamEnv.setParallelism(3)
>
> val tableEnvSettings = EnvironmentSettings.newInstance()
>   .useBlinkPlanner()
>   .inStreamingMode()
>   .build()
>
> val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)
>
> 
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
>  CheckpointingMode.EXACTLY_ONCE)
>
> 
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
>  Duration.ofSeconds(20))
>
> val dataStream = streamEnv.addSource(new MySource)
>
> val catalogName = "my_catalog"
> val catalog = new HiveCatalog(
>   catalogName,  // catalog name
>   "yutest",// default database
>
>   "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\",  // Hive 
> config (hive-site.xml) directory
>   "1.1.0"   // Hive version
> )
> tableEnv.registerCatalog(catalogName, catalog)
> tableEnv.useCatalog(catalogName)
>
> tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> tableEnv.useDatabase("yutest")
>
>
> tableEnv.createTemporaryView("users", dataStream)
> tableEnv.executeSql("DROP TABLE IF EXISTS  fs_table ")
> //  如果hive中已经存在了相应的表,则这段代码省略
> val hiveSql = """CREATE external TABLE fs_table (
> user_id STRING,
> order_amount DOUBLE
>   )
>   partitioned by(
>   dt string,
>   h string,
>   m string) stored as parquet
>   TBLPROPERTIES (
>
> 'partition.time-extractor.timestamp-pattern'='$dt 
> $h:$m:00',
> 'sink.partition-commit.delay'='0s',
> 'sink.partition-commit.trigger'='partition-time',
>
> 
> 'sink.partition-commit.policy.kind'='metastore,success-file'
>   )""".stripMargin
> tableEnv.executeSql(hiveSql)
>
>
> val insertSql = "insert into  fs_table SELECT userId, amount, " + " 
> DATE_FORMAT(ts, '-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') 
> FROM users"
> tableEnv.executeSql(insertSql)
>   }
> }
> public class MySource implements SourceFunction {
> private volatile boolean run = true;
> String userids[] = {
>
> "4760858d-2bec-483c-a535-291de04b2247", 
> "67088699-d4f4-43f2-913c-481bff8a2dc5",
>
> "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", 
> "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
>
> "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", 
> "3218bbb9-5874-4d37-a82d-3e35e52d1702",
>
> "3ebfb9602ac07779||3ebfe9612a007979", 
> "aec20d52-c2eb-4436-b121-c29ad4097f6c",
>
> "e7e896cd939685d7||e7e8e6c1930689d7", 
> "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
> };
>
> @Override
>
> public void run(SourceFunction.SourceContext sourceContext) 
> throws Exception {
>
> while (run) {
>
> String userid = userids[(int) (Math.random() * (userids.length - 
> 1))];
> UserInfo userInfo = new UserInfo();
> userInfo.setUserId(userid);
> userInfo.setAmount(Math.random() * 100);
> userInfo.setTs(new Timestamp(System.currentTimeMillis()));
> sourceContext.collect(userInfo);
> Thread.sleep(100);
> }
> }
>
> @Override
> public void cancel() {
> run = false;
> }
> }
> public class UserInfo implements Serializable {
> private String userId;
> private Double amount;
> private Timestamp ts;
>
> public String getUserId() {
> return userId;
> }
>
> public void setUserId(String userId) {
> this.userId = userId;
> }
>
> public Double getAmount() {
> return amount;
> }
>
> public void setAmount(Double amount) {
> this.amount = amount;
> }
>
> public Timestamp getTs() {
> return ts;
> }
>
> public void setTs(Timestamp ts) {
> this.ts = ts;
> }
> }
>
> hive (yutest)>
>  >
>  > show partitions fs_table;
> OK
> partition
> Time taken: 20.214 seconds
>
> --
> hdxg1101300...@163.com
>


-- 
Best, Jingsong Lee


flink hive Streaming查询不到数据的问题

2020-10-28 文章 hdxg1101300...@163.com
你好:
我现在在使用flink 1.11.2版本 hive1.1.0 版本;
当我在使用flink hive streaming的使用发现按照 示例写数据到hive 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据;
好像是分区信息没有提交到hive  meta store;但是官网已经说实现了这个功能;我操作却不行
下面是我的代码
 object StreamMain {
  def main(args: Array[String]): Unit = {
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.setParallelism(3)

val tableEnvSettings = EnvironmentSettings.newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()
val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)

tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
 CheckpointingMode.EXACTLY_ONCE)

tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
 Duration.ofSeconds(20))

val dataStream = streamEnv.addSource(new MySource)

val catalogName = "my_catalog"
val catalog = new HiveCatalog(
  catalogName,  // catalog name
  "yutest",// default database
  "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\",  // Hive config 
(hive-site.xml) directory
  "1.1.0"   // Hive version
)
tableEnv.registerCatalog(catalogName, catalog)
tableEnv.useCatalog(catalogName)

tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
tableEnv.useDatabase("yutest")


tableEnv.createTemporaryView("users", dataStream)
tableEnv.executeSql("DROP TABLE IF EXISTS  fs_table ")
//  如果hive中已经存在了相应的表,则这段代码省略
val hiveSql = """CREATE external TABLE fs_table (
user_id STRING,
order_amount DOUBLE
  )
  partitioned by(
  dt string,
  h string,
  m string) stored as parquet
  TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',
'sink.partition-commit.delay'='0s',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.policy.kind'='metastore,success-file'
  )""".stripMargin
tableEnv.executeSql(hiveSql)

val insertSql = "insert into  fs_table SELECT userId, amount, " + " 
DATE_FORMAT(ts, '-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') 
FROM users"
tableEnv.executeSql(insertSql)
  }
}
public class MySource implements SourceFunction {
private volatile boolean run = true;
String userids[] = {
"4760858d-2bec-483c-a535-291de04b2247", 
"67088699-d4f4-43f2-913c-481bff8a2dc5",
"72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", 
"dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
"aabbaa50-72f4-495c-b3a1-70383ee9d6a4", 
"3218bbb9-5874-4d37-a82d-3e35e52d1702",
"3ebfb9602ac07779||3ebfe9612a007979", 
"aec20d52-c2eb-4436-b121-c29ad4097f6c",
"e7e896cd939685d7||e7e8e6c1930689d7", 
"a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
};

@Override
public void run(SourceFunction.SourceContext sourceContext) 
throws Exception {

while (run) {
String userid = userids[(int) (Math.random() * (userids.length - 
1))];
UserInfo userInfo = new UserInfo();
userInfo.setUserId(userid);
userInfo.setAmount(Math.random() * 100);
userInfo.setTs(new Timestamp(System.currentTimeMillis()));
sourceContext.collect(userInfo);
Thread.sleep(100);
}
}

@Override
public void cancel() {
run = false;
}
}
public class UserInfo implements Serializable {
private String userId;
private Double amount;
private Timestamp ts;

public String getUserId() {
return userId;
}

public void setUserId(String userId) {
this.userId = userId;
}

public Double getAmount() {
return amount;
}

public void setAmount(Double amount) {
this.amount = amount;
}

public Timestamp getTs() {
return ts;
}

public void setTs(Timestamp ts) {
this.ts = ts;
}
}

hive (yutest)>
 >
 > show partitions fs_table;
OK
partition
Time taken: 20.214 seconds



hdxg1101300...@163.com