Re: Flink 连接Hive hiveConf路径配置

2020-12-18 文章 Jacob
谢谢回复

我想的也是如此,提交到hadoop集群,也应该能读到hive的conf dir

于是询问了DBA hive conf的路径为:/etc/hive/conf

我demo如下

public class FlinkHiveIntegration1 {
public static void main(String[] args) throws Exception {
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

String name = "myhive";
String defaultDatabase = "nydatabase";
String hiveConfDir = "/etc/hive/conf";  // hive-site.xml路径
String version = "1.1.0-cdh5.8.3";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
hiveConfDir, version);

tableEnv.registerCatalog(name, hive);
tableEnv.useCatalog(name);
tableEnv.useDatabase(defaultDatabase);

String hql = "select * from flink2hive_test";
tableEnv.executeSql(hql);
}
}

提交Job命令为:
   
./bin/flink run -m yarn-cluster -Djobmanager.memory.process.size=4096m
-Dtaskmanager.memory.process.size=4096m -Dtaskmanager.numberOfTaskSlots=1
-Dyarn.application.name="Flink_to_hive" -c
com.test.flink.function.FlinkHiveIntegration1
/opt/app/Flink/jar/1.11.2/flink-0.0.1-SNAPSHOT.jar --parallelism 1

报错如下:   
  
2020-12-18 23:41:33,188 FATAL org.apache.hadoop.conf.Configuration  
  
[] - error parsing conf file:/etc/hive/conf/hive-site.xml
java.io.FileNotFoundException: /etc/hive/conf/hive-site.xml (No such file or
directory)

...
...
..
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: java.io.FileNotFoundException: /etc/hive/conf/hive-site.xml
(No such file or directory)

提示文件不存在。





-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.11.2检查点失败

2020-12-18 文章 r pp
我觉得补充完整的 故障信息,以及你的资源配置信息,实例代码 可以更好的让别人回答你的问题

zhy  于2020年12月18日周五 下午4:07写道:

>
> 补充一下,状态后端选择的是rocksdb,检查点间隔为15分钟,超时时间为5分钟,感觉5分钟超时已经很大了,结果检查点线程还是会被中断,是需要继续调大超时时间吗
>
> zhy  于2020年12月18日周五 下午3:57写道:
>
> > hi、
> >
> >
> 我这面在使用flink1.11.2做实时特征的时候,状态大小大约在30g左右的时候任务就不能继续运行了,而查看异常日志发现大量的InterruptedException,请问这种情况是集群的问题还是flink的问题,而另一个3G状态的任务依然正常运行
> >
>


Re: flink 1.11 interval join场景下rocksdb内存超用问题

2020-12-18 文章 r pp
你好,能否把  promethus上metrics,   rocksdb_block_cache_usage的大小不断上升的
截图发一下,其它rocksdb 的内存图 如果有的话,也发一下

开始时间  到 结束时间  3个 小时的。




867127831 <867127...@qq.com> 于2020年12月18日周五 下午3:15写道:

> Hi,
>
>
> 我在flink 1.11 on k8s上运行了一个双流join的sql,使用rocksdb作为backend,flink
> managed部分的内存由flink托管(state.backend.rocksdb.memory.managed=true),但是发现k8s的pod的内存消耗一直在增加。具体情况如下:
>
>
> flink sql:
>
>
> insert into console_sink
> select t1.*, t2.*
> from t1 left join t2
> on t1.unique_id = t2.unique_id
> and t1.event_time BETWEEN t2.event_time - INTERVAL '1' HOUR AND
> t2.event_time + INTERVAL '1' HOUR
>
>
>
> 属性配置:
> state.backend=rocksdb;
> state.backend.incremental=false;
> state.backend.rocksdb.memory.managed=true
> state.idle.retention.mintime='10 min';
> state.idle.retention.maxtime='20 min';
> checkpoint.time.interval='15 min';
> source.idle-timeout='6 ms';
>
> taskmanager.memory.flink.size =55 gb
> taskmanager.memory.managed.fraction=0.85
>
>
>
>
>
>
> 运行现象:
> 1. checkpoint的size稳定在200G左右,说明state是有过期释放的。
> 2. k8s pod的使用内存不断增加,没有下降下来的趋势,最终整个pod的内存使用量超过pod内存上限,导致pod被杀掉。
> 3. 通过采集promethus上metrics,
> 发现rocksdb_block_cache_usage的大小不断上升,最终达到rocksdb_block_cache_capacity的上限。并且rocksdb_block_cache_usage的大小远远超过了flink
> managed部分内存的大小。
>
>
> 想知道,为什么在flink全托管rocksdb的情况下,为什么会出现rocksdb_block_cache_usage这个指标一直增长而不降低呢?


Re: Flink 1.12 job on yarn 集成hive时如何配置 hiveConf

2020-12-18 文章 r pp
这个问题 ,一个很朴素的思路 ,你集群里面的在哪里 ,就填哪里咯

Jacob <17691150...@163.com> 于2020年12月18日周五 下午4:13写道:

> Dear all,
>
> 请问在flink在集成hive时候,需要配置hive的conf目录,我的job是on yarn提交的,那么如何配置这个hive conf路径呢?
>
> String name = "myhive";
> String defaultDatabase = "mydatabase";
> String hiveConfDir = "";  // hive-site.xml路径
> String version = "1.1.0-cdh5.8.3";
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: jdbc sink无法插入数据

2020-12-18 文章 r pp
一个朴素的思路,数据量是多少,有没有考虑到数据库的写入压力呢?
去掉kafka sink ,看下 写入效果。
再对比下 加入kafka 后的效果。

一个通道,连接了两个sink,一个落水快,一个落水慢。落水快的很快消化了,落水慢 可能无法控速,就跪了, 进而导致整个通道 跪了

guoliubi...@foxmail.com  于2020年12月18日周五 下午2:01写道:

> Hi,
>
> 我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下
> .process(new ProcessFunction() {
> @Override
> public void processElement(RatioValuevalue, Context ctx,
> Collector out) throws Exception {
> out.collect(value);
> ctx.output(ratioOutputTag, value);
> }
> });
> sideStream.addSink(new FlinkKafkaProducer<>(
> "ratio_value",
> new RatioValueSerializationSchema(suffix),
> PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL),
> tool.get(SCHEMA_REGISTRY_URL)),
> FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
> DataStream ratioSideStream =
> sideStream.getSideOutput(ratioOutputTag);
> ratioSideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool));
> 在实际运行中,数据生成后能正确落入kafka,但是jdbc sink有些重启job后可用,有时重启后还是不可用。
> 用local environment模式运行时,断点断在JdbcSink的sink方法里,发现没法断点进行,感觉时没执行到JdbcSink。
> 想问下这种情况是否有什么排查手段?
>
>
> guoliubi...@foxmail.com
>


Table api ??????rowtime??????

2020-12-18 文章 ?g???U?[????

 
??DataStream??Tablerowtime??udtf??sqltableEnv.sql()Rowtime
 attributes must not be in the input rows of a regular join. As a workaround 
you can cast the time attributes of input tables to TIMESTAMP 
before.to_timestampeventTimesql??eventTime


sql: select
 tmp.metric_id as 
metric_id,
 
tmp.metric_config as metric_config,
 startLat,
 destName,
 bizType,
 orderId,
 completedTime,
 orderStatus,
 startHexList,
 cityId,
 type,
 destLat,
 endHexList,
 destLng,
 createTime,
 passengerId,
 finishedTime,
 vehicleId,
 startLng,
 startName,
 eventTime
from
 
htw_order_dwd_htw_order_geo_Infos,
 lateral table(
  
metricUdtf('aa')
 ) as 
tmp(metric_id, metric_config)


Thanks
Jiazhi

Re: Flink 连接Hive hiveConf路径配置

2020-12-18 文章 r pp
按照我朴素的思路,你的yarn环境可以读取hiveConf 的信息吧。。。
on Yarn 的提交模式,和本地是不同的
另一种是提交的时候 添加配置项
–files $HIVE_HOME/conf/hive-site.xml

Jacob <17691150...@163.com> 于2020年12月19日周六 上午9:26写道:

> Dears,
>
> flink在连接hive时,需配置hiveConf所在路径
>
> 我已经下载了集群中hive-site.xml文件,不知道应该放在哪个目录
>
> Job部署模式是 on Yarn
> ,请问代码中hiveConf应该放在哪个目录下,应该不是我启动job所在的机器吧?因为job提交后运行在hadoop集群,是无法找到相关目录的。
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-12-18 文章 r pp
一般性的推断是,模式 是属于配置项,若出现问题了,系统读取 或者 改变 配置项,能解决问题么?
之前的学习经验,计算机的解决方案是 出现问题,大都是保护现场,等问题解决后,释放现场。
状态 可以 类比是现场,当问题出现的时候,重点在状态的保护是怎么实现的,和配置没有太大关系,因为完全可以不读取配置。
配置项是面向用户选择 state 的方式,不是解决问题的方式

Storm☀️  于2020年12月18日周五 上午11:50写道:

> state.backend.incremental 出现问题的时候增量模式是开启的吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-12-18 文章 r pp
一般性的推断是,模式 是属于配置项,若出现问题了,系统读取 或者 改变 配置项,能解决问题么?
之前的学习经验,计算机的解决方案是 出现问题,大都是保护现场,等问题解决后,释放现场。
状态 可以 类比是现场,当问题出现的时候,重点在状态的保护是怎么实现的,和配置没有太大关系,因为完全可以不读取配置。
配置项是面向用户选择 state 的方式,不是解决问题的方式。

Storm☀️  于2020年12月18日周五 上午11:50写道:

> state.backend.incremental 出现问题的时候增量模式是开启的吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink 1.11.2 读写Hive以及对hive的版本支持

2020-12-18 文章 r pp
嗨。提供一个解决的思路
   1.缺包
   2在yarn 的环境下缺包,可以把缺的包 放在集群统一的位置,在提交命令时,指名所在包的位置。

Jacob <17691150...@163.com> 于2020年12月18日周五 下午2:01写道:

> Dear All,
>
> Flink.11.2操作hive时,对hive的版本支持是怎样的
>
>
> 看官网介绍是支持1.0、1.1、1.2、2.0、2.1、2.2、2.3、3.1
> 我的执行环境:
>
> *Flink : 1.11.2*
> *Haoop : 2.6.0-cdh5.8.3*
> *Hive : 1.1.0-cdh5.8.3*
> *Job运行方式 : on yarn*
>
> 同时对读写hive的demo,我不知道我写的是否正确:
>
> public static void main(String[] args) throws Exception {
>
> EnvironmentSettings settings = EnvironmentSettings
> .newInstance()
> .useBlinkPlanner()
> .inBatchMode()
> .build();
>
> TableEnvironment tableEnv = TableEnvironment.create(settings);
>
> String name = "myhive";
> String defaultDatabase = "datafeed";
> String hiveConfDir = "/opt/app/bigdata/hive-1.1.0-cdh5.8.3/conf";
> // hive-site.xml路径
> String version = "1.1.0-cdh5.8.3";
>
> HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> hiveConfDir, version);
>
> tableEnv.registerCatalog("myhive", hive);
> tableEnv.useCatalog("myhive");
> String createDbSql = "INSERT INTO TABLE flink2hive_test VALUES
> ('55', \"333\", \"CHN\")";
> tableEnv.sqlUpdate(createDbSql);
> }
>
> 这样的job提交到yarn会报错:
>
> Caused by: java.lang.ClassNotFoundException:
> org.apache.hadoop.mapreduce.TaskAttemptContext
>
> 是缺少MapReduce的相关包吗?
>
>
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


请教个Flink sql问题

2020-12-18 文章 占英华
Flink sql的dml语句可以将结果写入不同的sink表中吗?如果可以可以怎么处理?

Flink 连接Hive hiveConf路径配置

2020-12-18 文章 Jacob
Dears,

flink在连接hive时,需配置hiveConf所在路径

我已经下载了集群中hive-site.xml文件,不知道应该放在哪个目录

Job部署模式是 on Yarn
,请问代码中hiveConf应该放在哪个目录下,应该不是我启动job所在的机器吧?因为job提交后运行在hadoop集群,是无法找到相关目录的。



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Native Kubernetes 需要访问HDFS

2020-12-18 文章 Yang Wang
你可以在Flink client端设置HADOOP_CONF_DIR环境变量即可,这样会自动ship
hadoop的配置并且挂载给JobManager和TaskManager的

Best,
Yang

liujian <13597820...@qq.com> 于2020年12月18日周五 下午5:26写道:

> Hi:
>  在使用Native Kubernetes
> 需要访问HDFS,已经将flink-shaded-hadoop-2-uber-2.8.3-10.0.jar放到lib目录
>  但是hdfs是HA,那么就需要hdfs-site.xml等文件了,那么是如何指定这个文件呢 


Re: [ANNOUNCE] Apache Flink 1.11.3 released

2020-12-18 文章 Till Rohrmann
Thanks a lot to everyone who has contributed to this release and in
particular to Gordon and Xintong who did a great job.

Cheers,
Till

On Fri, Dec 18, 2020 at 12:34 PM Paul Lam  wrote:

> Well done! Thanks to Gordon and Xintong, and everyone that contributed to
> the release.
>
> Best,
> Paul Lam
>
> > 2020年12月18日 19:20,Xintong Song  写道:
> >
> > The Apache Flink community is very happy to announce the release of
> Apache Flink 1.11.3, which is the third bugfix release for the Apache Flink
> 1.11 series.
> >
> > Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
> >
> > The release is available for download at:
> > https://flink.apache.org/downloads.html <
> https://flink.apache.org/downloads.html>
> >
> > Please check out the release blog post for an overview of the
> improvements for this bugfix release:
> > https://flink.apache.org/news/2020/12/18/release-1.11.3.html <
> https://flink.apache.org/news/2020/12/18/release-1.11.3.html>
> >
> > The full release notes are available in Jira:
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348761
> <
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348761
> >
> >
> > We would like to thank all contributors of the Apache Flink community
> who made this release possible!
> >
> > Regards,
> > Gordon & Xintong
>
>


Re: Re:Re: flink sql作业state size一直增加

2020-12-18 文章 赵一旦
话说为什么会有这类需求呢,感觉flink就不应该有state.retention这种考虑。要么使用time window,要么over
window。干嘛不分窗呢。。

Storm☀️  于2020年12月18日周五 上午11:17写道:

> mini batch默认为false 。题主问题找到了吗
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 请教一下flink1.12可以指定时间清除state吗?

2020-12-18 文章 赵一旦
你这个直接按照天分窗就可以呀。

三色堇 <25977...@qq.com> 于2020年12月18日周五 下午3:20写道:

> Hi,社区的各位大家好:请教一下,我目前生产上使用的flink1.12,根据公司需求,统计每天的日报,每天出一组结果。已经做了group
> by current_date,userId 过程中我发现隔天的flink state未清理,还是在前一天的结果上累加,自己也测试了
> 1、Stream的TTL
> 2、tabEnv.getConfig().setIdleStateRetention(Duration.ofDays(1))这两种方式并不能满足我的需求,请教一下有其他方法可以实现这种日报需求吗?


Re: [ANNOUNCE] Apache Flink 1.11.3 released

2020-12-18 文章 Paul Lam
Well done! Thanks to Gordon and Xintong, and everyone that contributed to the 
release.

Best,
Paul Lam

> 2020年12月18日 19:20,Xintong Song  写道:
> 
> The Apache Flink community is very happy to announce the release of Apache 
> Flink 1.11.3, which is the third bugfix release for the Apache Flink 1.11 
> series.
> 
> Apache Flink® is an open-source stream processing framework for distributed, 
> high-performing, always-available, and accurate data streaming applications.
> 
> The release is available for download at:
> https://flink.apache.org/downloads.html 
> 
> 
> Please check out the release blog post for an overview of the improvements 
> for this bugfix release:
> https://flink.apache.org/news/2020/12/18/release-1.11.3.html 
> 
> 
> The full release notes are available in Jira:
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348761
>  
> 
> 
> We would like to thank all contributors of the Apache Flink community who 
> made this release possible!
> 
> Regards,
> Gordon & Xintong



Re: flink1.9.1 支持一个 source 指定消费多个 topics么?

2020-12-18 文章 赵一旦
从1.12开始支持。之前只能通过多张表union成一个view方式实现。

bradyMk  于2020年12月18日周五 下午3:53写道:

> Hi,想请教一下大家:
>
>
> 最近通过flink_taskmanager_job_task_operator_KafkaConsumer_records_consumed_rate指标发现,
> flink某个任务消费一个topic A 竟然比消费topic A,B,C,D一起的指标要高,
> 也就是我四个topic每秒消费的数据竟然还没其中一个topic每秒消费的数据高,
> 所以想请问:flink1.9.1 支持一个 source 指定消费多个 topics么?
> 我的代码如下:
> val A= params.getProperty("kafka.scene.data.topic")
> val B= params.getProperty("kafka.scene.log.topic")
> val C= params.getProperty("kafka.event.topic")
> val D= params.getProperty("kafka.log.topic")
> import scala.collection.JavaConverters._
> val topics = List[String](sceneDataTopic, sceneLogTopic, eventTopic,
> sdkLog).asJava
> env .addSource(new FlinkKafkaConsumer011(topics, new
> JSONKeyValueDeserializationSchema(false), kafkaPro))
>
>
>
>
> -
> Best Wishes
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


[ANNOUNCE] Apache Flink 1.11.3 released

2020-12-18 文章 Xintong Song
The Apache Flink community is very happy to announce the release of Apache
Flink 1.11.3, which is the third bugfix release for the Apache Flink 1.11
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2020/12/18/release-1.11.3.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348761

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gordon & Xintong


Re: Flink eventTIme问题

2020-12-18 文章 Khachatryan Roman
Hi Jiazhi,

Could you share table definitions and both queries?

Regards,
Roman


On Fri, Dec 18, 2020 at 4:39 AM ゞ野蠻遊戲χ  wrote:

> Hi all
>  When I use SQL with UDTF, when I call the tableEnv.sqlQuery ()
> method, I throw the following error: Rowtime attributes must not be in the
> input rows of a regular join. As a workaround you can cast the time
> attributes of input tables to TIMESTAMP before. I used the to_timestamp
> function in eventTIme and it doesn't work, How to solve the problem?
>
> sql: select
>   tmp.metric_id as metric_id,
>   tmp.metric_config as metric_config,
>   startLat,
>   destName,
>   bizType,
>   orderId,
>   completedTime,
>   orderStatus,
>   startHexList,
>   cityId,
>   type,
>   destLat,
>   endHexList,
>   destLng,
>   createTime,
>   passengerId,
>   finishedTime,
>   vehicleId,
>   startLng,
>   startName,
>   eventTime
> from
>   htw_order_dwd_htw_order_geo_Infos,
>   lateral table(
> metricUdtf('aa')
>   ) as tmp(metric_id, metric_config)
>
> Thanks
> Jiazhi
>


Native Kubernetes ????????HDFS

2020-12-18 文章 liujian
Hi:
 ??Native Kubernetes 
HDFS,??flink-shaded-hadoop-2-uber-2.8.3-10.0.jarlib
 
hdfs??HA,??hdfs-site.xml,
 

Flink 1.12 job on yarn 集成hive时如何配置 hiveConf

2020-12-18 文章 Jacob
Dear all,

请问在flink在集成hive时候,需要配置hive的conf目录,我的job是on yarn提交的,那么如何配置这个hive conf路径呢?

String name = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir = "";  // hive-site.xml路径
String version = "1.1.0-cdh5.8.3";



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink-SQL-Client 啥时候支持GateWay Mode

2020-12-18 文章 夜思流年梦
Dear developer:


想问下flink-sql client 啥时候支持GateWay Mode呢?


就好像Spark 有spark thrift server ,我们可以通过jdbc方式调用


我在  Apache Flink Home / Flink Improvement Proposals 这个里面看到是有 GateWay Mode 
的计划的,19年7月提的,


想问下这个的具体进度,最近几个版本会有规划吗?


GateWay Mode这个模式很符合业务实际场景, 这样我们可以通过JDBC/Rest API的方式调用,提交SQL;


望各位大神可以透露下进度或者规划

Re: flink1.11.2检查点失败

2020-12-18 文章 zhy
补充一下,状态后端选择的是rocksdb,检查点间隔为15分钟,超时时间为5分钟,感觉5分钟超时已经很大了,结果检查点线程还是会被中断,是需要继续调大超时时间吗

zhy  于2020年12月18日周五 下午3:57写道:

> hi、
>
> 我这面在使用flink1.11.2做实时特征的时候,状态大小大约在30g左右的时候任务就不能继续运行了,而查看异常日志发现大量的InterruptedException,请问这种情况是集群的问题还是flink的问题,而另一个3G状态的任务依然正常运行
>