?????? ??????????

2021-06-07 文章 MOBIN
Flink??Flink SQL??set



| |
MOBIN
|
??


??2021??02??1?? 18:06?L<1039601...@qq.com> ??
,??1.12?? EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamExecutionEnvironment executionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment streamTableEnv = 
StreamTableEnvironment.create(executionEnvironment, settings);
Configuration configuration = streamTableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.source.idle-timeout","1000 ms");




----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#dealing-with-idle-sources

best,
amenhub



nbsp;
 ???L
?? 2021-02-01 17:20
 user-zh
?? ??
flink1.12, kafka??3??, flink??3??. ??, 
, ??,, 
?

回复: flink自定义connector相关报错

2021-06-02 文章 MOBIN


Sorry, 工程下路径是没错的,是我发邮件时打错了,还可能是什么原因导致的呢?谢谢
src/main/resources/META-INF/services
| |
MOBIN
|
签名由网易邮箱大师定制


在2021年06月2日 17:05,Leonard Xu 写道:
路径错了

在 2021年6月2日,17:02,MOBIN <18814118...@163.com> 写道:

META-INF.services/org.apache.flink.table.factories.Factory

=>   META-INF/services/org.apache.flink.table.factories.Factory

祝好
Leonard

flink自定义connector相关报错

2021-06-02 文章 MOBIN


请教下,在自定义connector,IDEA上直接运demo时报了以下的错误:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' 
in
the classpath.


工程的resource目录也准确引入了META-INF.services/org.apache.flink.table.factories.Factory,但是感觉没生效一样
Tabel-common依赖也已经引入了
谢谢
| |
MOBIN
|
签名由网易邮箱大师定制



回复:窗口函数使用的时间类型

2021-06-01 文章 MOBIN
其次可以参考下阿里的demo:
https://help.aliyun.com/document_detail/62512.html?spm=a2c4g.11186623.6.827.49531b09XfgsU7




| |
MOBIN
|
签名由网易邮箱大师定制


在2021年06月1日 19:37,guoyb<861277...@qq.com> 写道:
好的,谢谢!


我试试



---原始邮件---
发件人: "Shuo Cheng"

回复:窗口函数使用的时间类型

2021-06-01 文章 MOBIN
是不是报的类似下面的错?
Window aggregate can only be defined over a time attribute column, but 
TIMESTAMP(3) encountered


| |
MOBIN
|
签名由网易邮箱大师定制


在2021年06月1日 19:00,guoyb<861277...@qq.com> 写道:
tumble() 开窗,需要的事件时间到底需要什么时间类型?一直报时间不对
timestamp(3)
datetime
time
都试过了,没有一个对的。

回复: prometheus metric中如何设置label

2021-05-12 文章 MOBIN


感觉labe value过滤考虑的情况比较少
private static final Pattern UNALLOWED_CHAR_PATTERN = 
Pattern.compile("[^a-zA-Z0-9:]");

只允许数字、字母及冒号,其实自定义label的场景下,带/的情况也有【比如文件路径】、带.的情况也有【主机IP】
| |
MOBIN
|
签名由网易邮箱大师定制


在2021年05月11日 15:04,Tony Wei 写道:
Hi,

1、是否是这种方式增加label

是的,MetricGroup#addGroup(key, value) 的設計其中一個目的就是為了支援 prometheus 的 label 。


2、由于采用了add group的方式,导致exp对应的值里面的 ‘.’ 变成了下划线,是否有办法保持为'.'

可以透過配置 filterLabelValueCharacters: false 來關閉過濾功能 [1],但使用者需要自行確保不會有非法字元混入
label value 之中。詳細可參考文檔說明。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/metric_reporters/#prometheus
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/metric_reporters/#prometheus>

best regards,

suisuimu <726400...@qq.com> 於 2021年5月10日 週一 下午6:32寫道:

基于RichMapFunction自定义Prometheus metric时增加label
```
counter = this.metricGroup
.addGroup("app", value.getAppName())
.addGroup("exp", value.getExceptionName())
.counter("myExpCounter");
```
通过add group可以在metric中看到label信息
flink_taskmanager_job_task_operator_app_exp_myExpCounter{app="",endpoint="pushgateway",exp="java_io_IOException",flink_namespace="xxx",host="11_7_9_11",job="xxx",job_id="xxx",job_name="custom_log",namespace="monitoring",operator_id="cf155f65686cb012844f7c745ec70a3c",operator_name="Map",pod="pushgateway-c7648cd5c-tvfb9",service="pushgateway",subtask_index="0",task_attempt_id="7d6fd088c0628eb564753939978086eb",task_attempt_num="0",task_id="cbc357ccb763df2852fee8c4fc7d55f2",task_name="Source:_Custom_SourceMapProcessMapSink:_Print_to_Std__Out",tm_id="10_7_9_71:6122_96edca"}

想问下:
1、是否是这种方式增加label
2、由于采用了add group的方式,导致exp对应的值里面的 ‘.’ 变成了下划线,是否有办法保持为'.'



--
Sent from: http://apache-flink.147419.n8.nabble.com/



??????flink cdc ????mysql binlog?????? streaming????????????????????reload????????????????????

2021-04-27 文章 MOBIN
datastreaming 
API??debeziumdebezium??
public static Properties debeziumProperties(){
Properties properties = new Properties();
properties.setProperty(????,??");
return properties;
}


SourceFunction sourceFunction = MySQLSource.builder()
. . .
.debeziumProperties(debeziumProperties())
.build();


| |
MOBIN
|
|
18814118...@163.com
|
??


??2021??04??27?? 14:46<1764232...@qq.com> ??
hi all

??flink cdc??streaming 
mode??binlog??mysql??RELOADsqlcdc
debezium.snap.shot.locking.mode = none


streaming 
mode??reload??flink 
cdc??
Properties properties = new Properties();
properties.setProperty("debezium.snapshot.locking.mode", "none");
SourceFunction