退订
huangqibing...@163.com
发件人: Chen Virtual
发送时间: 2021-04-08 12:19
收件人: user-zh@flink.apache.org
主题: 退订
退订
退订
发自我的iPhone
退订
我写了一个flink kafka connector的作业,从kafka topicA消费数据,做处理后,又sink回 topicB,
程序正常running中,偶现如下报错:
java.net.SocketException: Permission denied: connect
at sun.nio.ch.Net.connect0(Native Method) ~[?:1.8.0_231]
at sun.nio.ch.Net.connect(Net.java:454) ~[?:1.8.0_231]
at
如题,这个问题之前遇到过,昨天晚上又来一次。
首先我是flink1.12的standalone集群,通过公司的pass平台保证单机上的jm和tm失败后自动重启。
问题表现是,我停止任务,然后基于保存点重启任务。然后WEB-UI出现卡顿转圈,过一会报警显示某个容器的JM/TM失败自动重启。此期间WEB-UI转圈,我一直刷新,会出现突然一瞬间显示(应该是自动pass重启了),然后再刷新就继续转圈了(是某容器又失败,目前来看不是同一台容器一直失败,而是每个容器都轮着来失败)。
想請問在進行temporal join hive 表的時候能否只選擇特定幾行?
從官方文檔
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html,
他是join了整個表。 在flink 是否能想spark join hive 表一樣(見一下例子)?
SELECT * FROM kafka_stream
LEFT JOIN
(
SELECT column1, column2 FROM hive_table
) AS a
我在阿里云k8s上部署flink on native kubernetes application,默认用的服务暴露类型是
LoadBalancer,启动后会在公网暴露rest
url。运维管理人员不允许这样,说是只能使用固定预先申请的几个SLB,但我在flink官网没有找到有参数设置LoadBalancerIP,这样情况要怎么实现?
我希望能将某些维度下过去24小时的每一小时的统计结果计算出,然后合并保存在一个map中
在写SQL时,我尝试将多条计算结果合并保存至Map中:
create table to_redis(
biz_name STRING,
mchnt_id STRING,
zb_value MAP
) WITH (
'connector' = 'redis',
'redis-mode' = 'single',
'host' = '172.30.251.225',
'port' = '10006',
'password' = 'xxx',
我希望能将某些维度下过去24小时的每一小时的统计结果计算出,然后合并保存在一个map中
在写SQL时,我尝试将多条计算结果合并保存至Map中:
create table to_redis(
biz_name STRING,
mchnt_id STRING,
zb_value MAP
) WITH (
'connector' = 'redis',
'redis-mode' = 'single',
'host' = '172.30.251.225',
'port' = '10006',
'password' =
中间还有这样的错误:
20:14:48,707 WARN org.apache.kafka.common.utils.AppInfoParser
[] - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException:
kafka.producer:type=app-info,id="producer-Source: Custom Source -> Flat Map
-> Map -> Sink:
我写了一个stream程序,从kafka
topicA中读取数据,进行简单预处理后,sink回kafka的topicB,程序本地正常running,但是中间报了一些错误如下:
-
20:11:47,078 INFO org.apache.kafka.clients.Metadata
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink:
Dear 开发者:
目前发现从kafka sink到 hbase 会丢数据,相同的sql ,如果用jdbc方式 来sink 则不会丢失数据,具体建表sql 和任务sql 如下
flink 版本 1.12
源表: 使用canal-json 接入
create table rt_ods.ods_za_log_member_base_info(
MemberId bigint COMMENT '用户ID',
NickName string COMMENT '用户昵称',
proctime as PROCTIME()
) WITH (
flink??api??
1.connectkeyedstream??key join??
2.coprocessfunction ?? keyedcoprocessfunction
Flink 1.12.0?? Filesystem hdfs
orcEnvironmentSettings
settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String createSql = "CREATE
Hi??
flink-1.13??kafka
flink-examplestream-WordCount??kafkaidea??mvnjarflink
binarystandlone flink??,??
15 matches
Mail list logo