回复:flink jdbcsink 连接数的问题

2023-05-30 文章 小昌同学
老师,你好,再请教一下,连接数与并行度有关系的话,如果插入数据的MySQL是有主键的话,是不是连接数据也就是并行度只能为1啦呀,如果是多个并行度的话,可能会造成主键冲突;
感谢各位老师的指导


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | lxk |
| 发送日期 | 2023年5月30日 14:30 |
| 收件人 |  |
| 主题 | Re:flink jdbcsink 连接数的问题 |
hi,
jdbc创建链接是在SimpleJdbcConnectionProvider这个类中实现的,至于真正创建链接,则是由DriverManager来处理。
关于连接数,则是跟你的并行度有关。

















在 2023-05-30 13:55:57,"小昌同学"  写道:
各位老师,请教一下关于flink jdbcsink 连接数的问题;
我使用代码如下:在以下代码中,我查看了一下源码,并没有找到sink到MySQL时关于连接数的设定,请问这一块关于连接数的设定我应该怎么写呀;
谢谢各位老师的指导

|
outPutInfoStream.addSink(JdbcSink.sink(
"REPLACE  into InputInfo (breakCode, breakName, breakDuration, 
breakRule,breakPrimaryKey,breakStep,breakStepType,breakTime,breakSendTime,breakArgs)
 values (?,?,?,?,?,?,?,?,?,?)",
(statement, InPutInfo) -> {
statement.setString(1,InPutInfo.getBreakCode());
statement.setString(2,InPutInfo.getBreakName());
statement.setLong(3,InPutInfo.getBreakDuration());
statement.setString(4,InPutInfo.getBreakRule());
statement.setString(5,InPutInfo.getBreakPrimaryKey());
statement.setString(6, InPutInfo.getBreakStep());
statement.setString(7, InPutInfo.getBreakStepType());
statement.setString(8,InPutInfo.getBreakTime());
statement.setString(9, DateUtil.format(new Date()));
statement.setString(10, String.valueOf(InPutInfo.getBreakArgs()));
},
JdbcExecutionOptions.builder()
.withBatchSize(10)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://111/iap?useSSL=false=false=false=utf-8=convertToNull=true=Asia/Shanghai=true")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("111")
.withPassword("111")
.build()
)).name("sink-mysql");
|


| |
小昌同学
|
|
ccc0606fight...@163.com
|


回复:flink jdbcsink 连接数的问题

2023-05-30 文章 小昌同学
好滴呀 谢谢老师指导


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | lxk |
| 发送日期 | 2023年5月30日 14:30 |
| 收件人 |  |
| 主题 | Re:flink jdbcsink 连接数的问题 |
hi,
jdbc创建链接是在SimpleJdbcConnectionProvider这个类中实现的,至于真正创建链接,则是由DriverManager来处理。
关于连接数,则是跟你的并行度有关。

















在 2023-05-30 13:55:57,"小昌同学"  写道:
各位老师,请教一下关于flink jdbcsink 连接数的问题;
我使用代码如下:在以下代码中,我查看了一下源码,并没有找到sink到MySQL时关于连接数的设定,请问这一块关于连接数的设定我应该怎么写呀;
谢谢各位老师的指导

|
outPutInfoStream.addSink(JdbcSink.sink(
"REPLACE  into InputInfo (breakCode, breakName, breakDuration, 
breakRule,breakPrimaryKey,breakStep,breakStepType,breakTime,breakSendTime,breakArgs)
 values (?,?,?,?,?,?,?,?,?,?)",
(statement, InPutInfo) -> {
statement.setString(1,InPutInfo.getBreakCode());
statement.setString(2,InPutInfo.getBreakName());
statement.setLong(3,InPutInfo.getBreakDuration());
statement.setString(4,InPutInfo.getBreakRule());
statement.setString(5,InPutInfo.getBreakPrimaryKey());
statement.setString(6, InPutInfo.getBreakStep());
statement.setString(7, InPutInfo.getBreakStepType());
statement.setString(8,InPutInfo.getBreakTime());
statement.setString(9, DateUtil.format(new Date()));
statement.setString(10, String.valueOf(InPutInfo.getBreakArgs()));
},
JdbcExecutionOptions.builder()
.withBatchSize(10)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://111/iap?useSSL=false=false=false=utf-8=convertToNull=true=Asia/Shanghai=true")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("111")
.withPassword("111")
.build()
)).name("sink-mysql");
|


| |
小昌同学
|
|
ccc0606fight...@163.com
|


Re:flink jdbcsink 连接数的问题

2023-05-30 文章 lxk
hi,
jdbc创建链接是在SimpleJdbcConnectionProvider这个类中实现的,至于真正创建链接,则是由DriverManager来处理。
关于连接数,则是跟你的并行度有关。

















在 2023-05-30 13:55:57,"小昌同学"  写道:
>各位老师,请教一下关于flink jdbcsink 连接数的问题;
>我使用代码如下:在以下代码中,我查看了一下源码,并没有找到sink到MySQL时关于连接数的设定,请问这一块关于连接数的设定我应该怎么写呀;
>谢谢各位老师的指导
>
>|
>outPutInfoStream.addSink(JdbcSink.sink(
>"REPLACE  into InputInfo (breakCode, breakName, breakDuration, 
>breakRule,breakPrimaryKey,breakStep,breakStepType,breakTime,breakSendTime,breakArgs)
> values (?,?,?,?,?,?,?,?,?,?)",
>(statement, InPutInfo) -> {
>statement.setString(1,InPutInfo.getBreakCode());
>statement.setString(2,InPutInfo.getBreakName());
>statement.setLong(3,InPutInfo.getBreakDuration());
>statement.setString(4,InPutInfo.getBreakRule());
>statement.setString(5,InPutInfo.getBreakPrimaryKey());
>statement.setString(6, InPutInfo.getBreakStep());
>statement.setString(7, InPutInfo.getBreakStepType());
>statement.setString(8,InPutInfo.getBreakTime());
>statement.setString(9, DateUtil.format(new Date()));
>statement.setString(10, 
> String.valueOf(InPutInfo.getBreakArgs()));
>},
>JdbcExecutionOptions.builder()
>.withBatchSize(10)
>.withBatchIntervalMs(200)
>.withMaxRetries(5)
>.build(),
>new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
>
> .withUrl("jdbc:mysql://111/iap?useSSL=false=false=false=utf-8=convertToNull=true=Asia/Shanghai=true")
>.withDriverName("com.mysql.jdbc.Driver")
>.withUsername("111")
>.withPassword("111")
>.build()
>)).name("sink-mysql");
>|
>
>
>| |
>小昌同学
>|
>|
>ccc0606fight...@163.com
>|


flink jdbcsink 连接数的问题

2023-05-29 文章 小昌同学
各位老师,请教一下关于flink jdbcsink 连接数的问题;
我使用代码如下:在以下代码中,我查看了一下源码,并没有找到sink到MySQL时关于连接数的设定,请问这一块关于连接数的设定我应该怎么写呀;
谢谢各位老师的指导

|
outPutInfoStream.addSink(JdbcSink.sink(
"REPLACE  into InputInfo (breakCode, breakName, breakDuration, 
breakRule,breakPrimaryKey,breakStep,breakStepType,breakTime,breakSendTime,breakArgs)
 values (?,?,?,?,?,?,?,?,?,?)",
(statement, InPutInfo) -> {
statement.setString(1,InPutInfo.getBreakCode());
statement.setString(2,InPutInfo.getBreakName());
statement.setLong(3,InPutInfo.getBreakDuration());
statement.setString(4,InPutInfo.getBreakRule());
statement.setString(5,InPutInfo.getBreakPrimaryKey());
statement.setString(6, InPutInfo.getBreakStep());
statement.setString(7, InPutInfo.getBreakStepType());
statement.setString(8,InPutInfo.getBreakTime());
statement.setString(9, DateUtil.format(new Date()));
statement.setString(10, 
String.valueOf(InPutInfo.getBreakArgs()));
},
JdbcExecutionOptions.builder()
.withBatchSize(10)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()

.withUrl("jdbc:mysql://111/iap?useSSL=false=false=false=utf-8=convertToNull=true=Asia/Shanghai=true")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("111")
.withPassword("111")
.build()
)).name("sink-mysql");
|


| |
小昌同学
|
|
ccc0606fight...@163.com
|

回复: flink 输出异常数据

2023-05-29 文章 小昌同学
你好,数据源是kafka,使用的是stream api


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Weihua Hu |
| 发送日期 | 2023年5月29日 15:29 |
| 收件人 |  |
| 主题 | Re: flink 输出异常数据 |
Hi,

你使用的数据源是什么呢?Kafka 吗?用的是 FlinkSQL 还是 DataStream API 呢?

方便把异常栈贴一下吗

Best,
Weihua


On Mon, May 29, 2023 at 1:36 PM 小昌同学  wrote:


各位老师,我有一个作业运行很久了,但是最近源系统有一些脏数据导致作业运行失败,看yarn的日志报错是空指针,但是我现在想把那一条脏数据捕获到,请问一下有啥办法吗?谢谢各位老师的指导


| |
小昌同学
|
|
ccc0606fight...@163.com
|


Re: [ANNOUNCE] Apache Flink 1.16.2 released

2023-05-29 文章 Jing Ge
Hi Weijie,

Thanks for your contribution and feedback! In case there are some reasons
not to allow us to upgrade them, we still can leverage virtualenv or pipenv
to create a dedicated environment for Flink release. WDYT?

cc Dian Fu

@Dian
I was wondering if you know the reason. Thanks!

Best regards,
Jing




On Mon, May 29, 2023 at 6:27 AM weijie guo 
wrote:

> Hi Jing,
>
> Thank you for caring about the releasing process. It has to be said that
> the entire process went smoothly. We have very comprehensive
> documentation[1] to guide my work, thanks to the contribution of previous
> release managers and the community.
>
> Regarding the obstacles, I actually only have one minor problem: We used an
> older twine(1.12.0) to deploy python artifacts to PyPI, and its compatible
> dependencies (such as urllib3) are also older. When I tried twine upload,
> the process couldn't work as expected as the version of urllib3 installed
> in my machine was relatively higher. In order to solve this, I had to
> proactively downgrade the version of some dependencies. I added a notice in
> the cwiki page[1] to prevent future release managers from encountering the
> same problem. It seems that this is a known issue(see comments in [2]),
> which has been resolved in the higher version of twine, I wonder if we can
> upgrade the version of twine? Does anyone remember the reason why we fixed
> a very old version(1.12.0)?
>
> Best regards,
>
> Weijie
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release
>
> [2] https://github.com/pypa/twine/issues/997
>
>
> Jing Ge  于2023年5月27日周六 00:15写道:
>
> > Hi Weijie,
> >
> > Thanks again for your effort. I was wondering if there were any obstacles
> > you had to overcome while releasing 1.16.2 and 1.17.1 that could lead us
> to
> > any improvement wrt the release process and management?
> >
> > Best regards,
> > Jing
> >
> > On Fri, May 26, 2023 at 4:41 PM Martijn Visser  >
> > wrote:
> >
> > > Thank you Weijie and those who helped with testing!
> > >
> > > On Fri, May 26, 2023 at 1:06 PM weijie guo 
> > > wrote:
> > >
> > > > The Apache Flink community is very happy to announce the release of
> > > > Apache Flink 1.16.2, which is the second bugfix release for the
> Apache
> > > > Flink 1.16 series.
> > > >
> > > >
> > > >
> > > > Apache Flink® is an open-source stream processing framework for
> > > > distributed, high-performing, always-available, and accurate data
> > > > streaming applications.
> > > >
> > > >
> > > >
> > > > The release is available for download at:
> > > >
> > > > https://flink.apache.org/downloads.html
> > > >
> > > >
> > > >
> > > > Please check out the release blog post for an overview of the
> > > > improvements for this bugfix release:
> > > >
> > > > https://flink.apache.org/news/2023/05/25/release-1.16.2.html
> > > >
> > > >
> > > >
> > > > The full release notes are available in Jira:
> > > >
> > > >
> > > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352765
> > > >
> > > >
> > > >
> > > > We would like to thank all contributors of the Apache Flink community
> > > > who made this release possible!
> > > >
> > > >
> > > >
> > > > Feel free to reach out to the release managers (or respond to this
> > > > thread) with feedback on the release process. Our goal is to
> > > > constantly improve the release process. Feedback on what could be
> > > > improved or things that didn't go so well are appreciated.
> > > >
> > > >
> > > >
> > > > Regards,
> > > >
> > > > Release Manager
> > > >
> > >
> >
>


Re: flink 输出异常数据

2023-05-29 文章 Weihua Hu
Hi,

你使用的数据源是什么呢?Kafka 吗?用的是 FlinkSQL 还是 DataStream API 呢?

方便把异常栈贴一下吗

Best,
Weihua


On Mon, May 29, 2023 at 1:36 PM 小昌同学  wrote:

>
> 各位老师,我有一个作业运行很久了,但是最近源系统有一些脏数据导致作业运行失败,看yarn的日志报错是空指针,但是我现在想把那一条脏数据捕获到,请问一下有啥办法吗?谢谢各位老师的指导
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |


自定义trigger触发问题

2023-05-29 文章 吴先生
自定义的trigger,实现满足maxcount或者到达窗口结束时间时输出结果;
问题:同一个窗口,在代码窗口结束时onProcessingTime会触发多次,理论上每个爽口只应该在到达窗口结束时间触发一次,是什么原因
主类代码片段:
SingleOutputStreamOperator> windowMap =

afterMap.timeWindowAll(Time.seconds(5))
.trigger(new CountAndProcessingTimeTrigger(
100))
.process(simpleConfig.getWindowFunction().newInstance())
触发器代码:


public class CountAndProcessingTimeTrigger extends Trigger {
private static final long serialVersionUID = 1L;
//窗口最大个数
private final long maxCount;
private final ReducingStateDescriptor stateDesc;
public CountAndProcessingTimeTrigger(long maxCount) {
this.stateDesc = new ReducingStateDescriptor<>("count_time", new 
CountAndProcessingTimeTrigger.Sum(),
LongSerializer.INSTANCE);
this.maxCount = maxCount;
}
/**
 * 元素添加
 *
 * @param o 元素
 * @param timestamp timestamp
 * @param window window
 * @param triggerContext triggerContext
 * @return TriggerResult
 * CONTINUE:表示啥都不做。
 * FIRE:表示触发计算,同时保留窗口中的数据
 * PURGE:简单地删除窗口的内容,并保留关于窗口和任何触发器状态的任何潜在元信息。
 * FIRE_AND_PURGE:触发计算,然后清除窗口中的元素。
 * @throws Exception Exception
 */
@Override
public TriggerResult onElement(Object o, long timestamp, TimeWindow window, 
TriggerContext triggerContext)
throws Exception {
triggerContext.registerProcessingTimeTimer(window.maxTimestamp());
ReducingState countState = 
triggerContext.getPartitionedState(stateDesc);
countState.add(1L);
if (countState.get() >= maxCount) {
log.info("countTrigger: {}", countState.get());
countState.clear();
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
/**
 * 窗口关闭
 *
 * @param timestamp timestamp
 * @param window window
 * @param triggerContext triggerContext
 * @return TriggerResult
 * @throws Exception Exception
 */
@Override
public TriggerResult onProcessingTime(long timestamp, TimeWindow window, 
TriggerContext triggerContext)
throws Exception {
ReducingState countState = 
triggerContext.getPartitionedState(stateDesc);
log.info("timeTrigger: {}, currentProcessingTime:{}", countState.get(),
window.maxTimestamp());
countState.clear();
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long timestamp, TimeWindow window, 
TriggerContext triggerContext)
throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public boolean canMerge() {
return false;
}
@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) {
ctx.mergePartitionedState(stateDesc);
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
ctx.registerProcessingTimeTimer(windowMaxTimestamp);
}
}
/**
 * 窗口删除
 *
 * @param window window
 * @param triggerContext triggerContext
 * @throws Exception Exception
 */
@Override
public void clear(TimeWindow window, TriggerContext triggerContext) throws 
Exception {
triggerContext.deleteProcessingTimeTimer(window.maxTimestamp());
triggerContext.getPartitionedState(stateDesc).clear();
}
/**
 * 计数方法
 */
private static class Sum implements ReduceFunction {
private static final long serialVersionUID = 1L;
private Sum() {
}
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}


| |
吴先生
|
|
15951914...@163.com
|

flink 输出异常数据

2023-05-28 文章 小昌同学
各位老师,我有一个作业运行很久了,但是最近源系统有一些脏数据导致作业运行失败,看yarn的日志报错是空指针,但是我现在想把那一条脏数据捕获到,请问一下有啥办法吗?谢谢各位老师的指导


| |
小昌同学
|
|
ccc0606fight...@163.com
|

Re: [ANNOUNCE] Apache Flink 1.16.2 released

2023-05-28 文章 weijie guo
Hi Jing,

Thank you for caring about the releasing process. It has to be said that
the entire process went smoothly. We have very comprehensive
documentation[1] to guide my work, thanks to the contribution of previous
release managers and the community.

Regarding the obstacles, I actually only have one minor problem: We used an
older twine(1.12.0) to deploy python artifacts to PyPI, and its compatible
dependencies (such as urllib3) are also older. When I tried twine upload,
the process couldn't work as expected as the version of urllib3 installed
in my machine was relatively higher. In order to solve this, I had to
proactively downgrade the version of some dependencies. I added a notice in
the cwiki page[1] to prevent future release managers from encountering the
same problem. It seems that this is a known issue(see comments in [2]),
which has been resolved in the higher version of twine, I wonder if we can
upgrade the version of twine? Does anyone remember the reason why we fixed
a very old version(1.12.0)?

Best regards,

Weijie


[1]
https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release

[2] https://github.com/pypa/twine/issues/997


Jing Ge  于2023年5月27日周六 00:15写道:

> Hi Weijie,
>
> Thanks again for your effort. I was wondering if there were any obstacles
> you had to overcome while releasing 1.16.2 and 1.17.1 that could lead us to
> any improvement wrt the release process and management?
>
> Best regards,
> Jing
>
> On Fri, May 26, 2023 at 4:41 PM Martijn Visser 
> wrote:
>
> > Thank you Weijie and those who helped with testing!
> >
> > On Fri, May 26, 2023 at 1:06 PM weijie guo 
> > wrote:
> >
> > > The Apache Flink community is very happy to announce the release of
> > > Apache Flink 1.16.2, which is the second bugfix release for the Apache
> > > Flink 1.16 series.
> > >
> > >
> > >
> > > Apache Flink® is an open-source stream processing framework for
> > > distributed, high-performing, always-available, and accurate data
> > > streaming applications.
> > >
> > >
> > >
> > > The release is available for download at:
> > >
> > > https://flink.apache.org/downloads.html
> > >
> > >
> > >
> > > Please check out the release blog post for an overview of the
> > > improvements for this bugfix release:
> > >
> > > https://flink.apache.org/news/2023/05/25/release-1.16.2.html
> > >
> > >
> > >
> > > The full release notes are available in Jira:
> > >
> > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352765
> > >
> > >
> > >
> > > We would like to thank all contributors of the Apache Flink community
> > > who made this release possible!
> > >
> > >
> > >
> > > Feel free to reach out to the release managers (or respond to this
> > > thread) with feedback on the release process. Our goal is to
> > > constantly improve the release process. Feedback on what could be
> > > improved or things that didn't go so well are appreciated.
> > >
> > >
> > >
> > > Regards,
> > >
> > > Release Manager
> > >
> >
>


回复: FlinkSQL大窗口小步长的滑动窗口解决方案

2023-05-28 文章 tanjialiang
Hi, Shammon FY.

理论上提高并行度是可以缓解,但是并行度调整太大对成本要求可能会比较高。因为写入其实不需要占用太多的资源,只是窗口触发后数据量过大(用户的基数),每条数据合并的操作成本过高(一条数据的窗口聚合成本需要合并24*60/5=288次)。


现在我只能想到以下几种解决办法
1. 将窗口步长往上调,这个问题可以从根本上解决(步长过长意味着窗口触发的时间会延后)
2. 步长可以往上调,使用early-fire机制,将未计算完成的窗口直接下发(触发的数据可能不符合近24小时的业务含义,下游系统需要支持upsert)
3. 借助外部存储,flink直接同步或者预聚合的方式写入一个OLAP系统(譬如doris/ck),读时再聚合(需要一个稳定可靠的外部存储)


你这边用flink做滑动窗口的计算会遇到这样的问题吗?是否还有其他更好解决办法?
十分期待你的反馈
best, 
tanjialiang.
 回复的原邮件 
| 发件人 | Shammon FY |
| 发送日期 | 2023年5月29日 09:08 |
| 收件人 |  |
| 主题 | Re: FlinkSQL大窗口小步长的滑动窗口解决方案 |
Hi,

这是窗口触发后发送的数据量过大吗?调大资源,加大窗口计算的并发度是否可以缓解这个问题?

Best,
Shammon FY

On Fri, May 26, 2023 at 2:03 PM tanjialiang  wrote:

Hi, all.
我在使用FlinkSQL的window tvf滑动窗口时遇到一些问题。
滑动步长为5分钟,窗口为24小时,group by
user_id的滑动窗口,当任务挂掉了或者从kafka的earliest-offset消费,checkpoint很难成功。
因为从earliest开始消费,数据很快就会堆满缓冲区产生背压,这时这一批数据可能会触发N次窗口计算往下游发,每次触发的操作成本是(用户基数 *
24 * 60 / 5),checkpoint barrier可能会一直卡住。
这时候有什么办法可以破局吗?


best,
tanjialiang.


Flink使用精准一次写入kafka报错

2023-05-28 文章 lxk
上封邮件发错了,重新发一下。项目中使用精准一次语义写入kafka,代码和配置如下:

写入代码如下:
Properties producerProperties = MyKafkaUtil.getProducerProperties();
KafkaSink kafkaSink = KafkaSink.builder()
.setBootstrapServers(Event2Kafka.parameterTool.get("bootstrap.server"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(Event2Kafka.parameterTool.get("feature.topic.name"))
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setKafkaProducerConfig(producerProperties)
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("streamx_flow_1261")
.build();

eventJsonStream.sinkTo(kafkaSink).setParallelism(14)

.name("event2kafka").uid("kafkasink");




kafka配置如下:
public static Properties getProducerProperties(){
Properties kafkaProducerProps = new Properties();
kafkaProducerProps.setProperty("bootstrap.servers", 
parameterTool.get(bootstrap.server"));
kafkaProducerProps.setProperty("auto.commit.interval.ms", "5000");
kafkaProducerProps.setProperty("auto.offset.reset", "latest");
kafkaProducerProps.setProperty("session.timeout.ms", "5000");
kafkaProducerProps.setProperty("transaction.timeout.ms",12*6 +"");
kafkaProducerProps.put("security.protocol", "SASL_PLAINTEXT");
kafkaProducerProps.put("sasl.kerberos.service.name","kafka");

return kafkaProducerProps;

}


项目运行很久都没啥问题,最近突然报了以下的错误
org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka 
-topic-2@-1 with 
FlinkKafkaInternalProducer{transactionalId='streamx_flow_1261-8-5', 
inTransaction=true, closed=false} 
at 
org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:436)
at 
org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:417)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.OutOfOrderSequenceException: The 
broker received an out of order sequence number.


参考了stackoverflow上面的回答:https://stackoverflow.com/questions/55192852/transactional-producer-vs-just-idempotent-producer-java-exception-outoforderseq
但是里面涉及到的参数我都没有设置,都是使用默认的配置。照理来说应该不会有这样的问题。想请问下各位有没有什么看法。还是我的配置有啥错误和缺少的地方。



Flink使用精准一次写入kafka报错

2023-05-28 文章 lxk
项目中使用精准一次语义写入kafka,代码和配置如下:
Properties producerProperties = MyKafkaUtil.getProducerProperties();
KafkaSink kafkaSink = KafkaSink.builder()
.setBootstrapServers(Event2Kafka.parameterTool.get("bootstrap.server"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(Event2Kafka.parameterTool.get("feature.topic.name"))
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setKafkaProducerConfig(producerProperties)
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("streamx_flow_1261")
.build();

eventJsonStream.sinkTo(kafkaSink).setParallelism(14)
.name("event2kafka").uid("kafkasink");
public static Properties getProducerProperties(){
Properties kafkaProducerProps = new Properties();
kafkaProducerProps.setProperty("bootstrap.servers", 
parameterTool.get(bootstrap.server"));
kafkaProducerProps.setProperty("auto.commit.interval.ms", "5000");
kafkaProducerProps.setProperty("auto.offset.reset", "latest");
kafkaProducerProps.setProperty("session.timeout.ms", "5000");
kafkaProducerProps.setProperty("transaction.timeout.ms",12*6 +"");
kafkaProducerProps.put("security.protocol", "SASL_PLAINTEXT");
kafkaProducerProps.put("sasl.kerberos.service.name","kafka");

return kafkaProducerProps;
}

Re: FlinkSQL大窗口小步长的滑动窗口解决方案

2023-05-28 文章 Shammon FY
Hi,

这是窗口触发后发送的数据量过大吗?调大资源,加大窗口计算的并发度是否可以缓解这个问题?

Best,
Shammon FY

On Fri, May 26, 2023 at 2:03 PM tanjialiang  wrote:

> Hi, all.
> 我在使用FlinkSQL的window tvf滑动窗口时遇到一些问题。
> 滑动步长为5分钟,窗口为24小时,group by
> user_id的滑动窗口,当任务挂掉了或者从kafka的earliest-offset消费,checkpoint很难成功。
> 因为从earliest开始消费,数据很快就会堆满缓冲区产生背压,这时这一批数据可能会触发N次窗口计算往下游发,每次触发的操作成本是(用户基数 *
> 24 * 60 / 5),checkpoint barrier可能会一直卡住。
> 这时候有什么办法可以破局吗?
>
>
> best,
> tanjialiang.


退订

2023-05-27 文章 曹佳清
退订


Re: Flink RocketMQ Connector

2023-05-26 文章 Feng Jin
hi casel

Flink RocketMQ connector 是由 RockeMQ 社区维护的, 对应的项目地址是:
https://github.com/apache/rocketmq-flink  这个版本默认的消息是格式 DELIMIT 格式(默认消息是
String,按分隔符进行分割), 只能指定消息的列分隔符.


best,
feng


On Fri, May 26, 2023 at 7:44 PM casel.chen  wrote:

> 有没有Flink RocketMQ官方连接器? 需要自己开发吗?Flink生态组件网址(用户上传自己开发的连接器格式什么的)是什么?


Re: [ANNOUNCE] Apache Flink 1.16.2 released

2023-05-26 文章 Jing Ge
Hi Weijie,

Thanks again for your effort. I was wondering if there were any obstacles
you had to overcome while releasing 1.16.2 and 1.17.1 that could lead us to
any improvement wrt the release process and management?

Best regards,
Jing

On Fri, May 26, 2023 at 4:41 PM Martijn Visser 
wrote:

> Thank you Weijie and those who helped with testing!
>
> On Fri, May 26, 2023 at 1:06 PM weijie guo 
> wrote:
>
> > The Apache Flink community is very happy to announce the release of
> > Apache Flink 1.16.2, which is the second bugfix release for the Apache
> > Flink 1.16 series.
> >
> >
> >
> > Apache Flink® is an open-source stream processing framework for
> > distributed, high-performing, always-available, and accurate data
> > streaming applications.
> >
> >
> >
> > The release is available for download at:
> >
> > https://flink.apache.org/downloads.html
> >
> >
> >
> > Please check out the release blog post for an overview of the
> > improvements for this bugfix release:
> >
> > https://flink.apache.org/news/2023/05/25/release-1.16.2.html
> >
> >
> >
> > The full release notes are available in Jira:
> >
> >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352765
> >
> >
> >
> > We would like to thank all contributors of the Apache Flink community
> > who made this release possible!
> >
> >
> >
> > Feel free to reach out to the release managers (or respond to this
> > thread) with feedback on the release process. Our goal is to
> > constantly improve the release process. Feedback on what could be
> > improved or things that didn't go so well are appreciated.
> >
> >
> >
> > Regards,
> >
> > Release Manager
> >
>


Re: [ANNOUNCE] Apache Flink 1.17.1 released

2023-05-26 文章 Jing Ge
Hi Weijie,

That is earlier than I expected! Thank you so much for your effort!

Best regards,
Jing

On Fri, May 26, 2023 at 4:44 PM Martijn Visser 
wrote:

> Same here as with Flink 1.16.2, thank you Weijie and those who helped with
> testing!
>
> On Fri, May 26, 2023 at 1:08 PM weijie guo 
> wrote:
>
>>
>> The Apache Flink community is very happy to announce the release of Apache 
>> Flink 1.17.1, which is the first bugfix release for the Apache Flink 1.17 
>> series.
>>
>>
>>
>>
>> Apache Flink® is an open-source stream processing framework for distributed, 
>> high-performing, always-available, and accurate data streaming applications.
>>
>>
>>
>> The release is available for download at:
>>
>> https://flink.apache.org/downloads.html
>>
>>
>>
>>
>> Please check out the release blog post for an overview of the improvements 
>> for this bugfix release:
>>
>> https://flink.apache.org/news/2023/05/25/release-1.17.1.html
>>
>>
>>
>> The full release notes are available in Jira:
>>
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352886
>>
>>
>>
>>
>> We would like to thank all contributors of the Apache Flink community who 
>> made this release possible!
>>
>>
>>
>>
>> Feel free to reach out to the release managers (or respond to this thread) 
>> with feedback on the release process. Our goal is to constantly improve the 
>> release process. Feedback on what could be improved or things that didn't go 
>> so well are appreciated.
>>
>>
>>
>> Regards,
>>
>> Release Manager
>>
>


Re: [ANNOUNCE] Apache Flink 1.17.1 released

2023-05-26 文章 Martijn Visser
Same here as with Flink 1.16.2, thank you Weijie and those who helped with
testing!

On Fri, May 26, 2023 at 1:08 PM weijie guo 
wrote:

>
> The Apache Flink community is very happy to announce the release of Apache 
> Flink 1.17.1, which is the first bugfix release for the Apache Flink 1.17 
> series.
>
>
>
>
> Apache Flink® is an open-source stream processing framework for distributed, 
> high-performing, always-available, and accurate data streaming applications.
>
>
>
> The release is available for download at:
>
> https://flink.apache.org/downloads.html
>
>
>
>
> Please check out the release blog post for an overview of the improvements 
> for this bugfix release:
>
> https://flink.apache.org/news/2023/05/25/release-1.17.1.html
>
>
>
> The full release notes are available in Jira:
>
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352886
>
>
>
>
> We would like to thank all contributors of the Apache Flink community who 
> made this release possible!
>
>
>
>
> Feel free to reach out to the release managers (or respond to this thread) 
> with feedback on the release process. Our goal is to constantly improve the 
> release process. Feedback on what could be improved or things that didn't go 
> so well are appreciated.
>
>
>
> Regards,
>
> Release Manager
>


Re: [ANNOUNCE] Apache Flink 1.16.2 released

2023-05-26 文章 Martijn Visser
Thank you Weijie and those who helped with testing!

On Fri, May 26, 2023 at 1:06 PM weijie guo 
wrote:

> The Apache Flink community is very happy to announce the release of
> Apache Flink 1.16.2, which is the second bugfix release for the Apache
> Flink 1.16 series.
>
>
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data
> streaming applications.
>
>
>
> The release is available for download at:
>
> https://flink.apache.org/downloads.html
>
>
>
> Please check out the release blog post for an overview of the
> improvements for this bugfix release:
>
> https://flink.apache.org/news/2023/05/25/release-1.16.2.html
>
>
>
> The full release notes are available in Jira:
>
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352765
>
>
>
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>
>
>
> Feel free to reach out to the release managers (or respond to this
> thread) with feedback on the release process. Our goal is to
> constantly improve the release process. Feedback on what could be
> improved or things that didn't go so well are appreciated.
>
>
>
> Regards,
>
> Release Manager
>


Flink RocketMQ Connector

2023-05-26 文章 casel.chen
有没有Flink RocketMQ官方连接器? 需要自己开发吗?Flink生态组件网址(用户上传自己开发的连接器格式什么的)是什么?

[ANNOUNCE] Apache Flink 1.17.1 released

2023-05-26 文章 weijie guo
The Apache Flink community is very happy to announce the release of
Apache Flink 1.17.1, which is the first bugfix release for the Apache
Flink 1.17 series.



Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data
streaming applications.



The release is available for download at:

https://flink.apache.org/downloads.html



Please check out the release blog post for an overview of the
improvements for this bugfix release:

https://flink.apache.org/news/2023/05/25/release-1.17.1.html



The full release notes are available in Jira:

https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352886



We would like to thank all contributors of the Apache Flink community
who made this release possible!



Feel free to reach out to the release managers (or respond to this
thread) with feedback on the release process. Our goal is to
constantly improve the release process. Feedback on what could be
improved or things that didn't go so well are appreciated.



Regards,

Release Manager


Re: 咨询flink1.17+jdk11环境下使用G1 GC发生大量full gc的问题

2023-05-26 文章 Yuxin Tan
hi, yidan

除 jvm 参数外,flink 其他配置完全一样吗?比如 state backend 是否有变化?

另外, jdk11 是不是用的最新版本,不是的话,觉得也可以尝试一下最新版本。

如果 jdk11 用的最新版本,可以尝试下使用其他 GC 算法是否也有同样问题。比如 -XX:+UseParallelGC
-XX:NewRatio=3 -XX:ParallelGCThreads=4 -XX:CICompilerCount=4
-XX:-CompactStrings

Best,
Yuxin


yidan zhao  于2023年5月26日周五 17:39写道:

> 最近升级flink版本和jdk版本,flink从1.15.2升级到1.17.0,jdk从8升级到11。然后出现大量full gc。
> 分析后,发现主要是 System.gc() 导致。 进一步定位到是 redisson 库中 netty 部分用到了 DirectMemory
> 导致。 直接内存不足,导致频繁调用 System.gc 触发 full gc。
> 我现在问题是,通过测试对比实验发现,jdk8+flink1.17没问题,jdk11+flink1.17就会有该问题。
> 有人知道原因嘛?
>
> 其他信息:
>
> jdk8和jdk11情况下都是G1GC,且vm参数一致,直接内存max限制也一致。但是通过jinfo等查看,确实jdk8场景下的directMemory使用较少。
>


Re: 用flink sql如何实现累计当天交易数量每来一笔交易都实时更新状态并输出到下游?

2023-05-26 文章 Shammon FY
Hi

可以将天级时间和其他需要聚合的字段组成key,使用聚合算子,默认会每条数据完成计算后实时输出结果

Best,
Shammon FY

On Fri, May 26, 2023 at 3:44 PM casel.chen  wrote:

> 用flink sql如何实现累计当天交易数量每来一笔交易都实时更新状态并输出到下游?


咨询flink1.17+jdk11环境下使用G1 GC发生大量full gc的问题

2023-05-26 文章 yidan zhao
最近升级flink版本和jdk版本,flink从1.15.2升级到1.17.0,jdk从8升级到11。然后出现大量full gc。
分析后,发现主要是 System.gc() 导致。 进一步定位到是 redisson 库中 netty 部分用到了 DirectMemory
导致。 直接内存不足,导致频繁调用 System.gc 触发 full gc。
我现在问题是,通过测试对比实验发现,jdk8+flink1.17没问题,jdk11+flink1.17就会有该问题。
有人知道原因嘛?

其他信息:
jdk8和jdk11情况下都是G1GC,且vm参数一致,直接内存max限制也一致。但是通过jinfo等查看,确实jdk8场景下的directMemory使用较少。


用flink sql如何实现累计当天交易数量每来一笔交易都实时更新状态并输出到下游?

2023-05-26 文章 casel.chen
用flink sql如何实现累计当天交易数量每来一笔交易都实时更新状态并输出到下游?

FlinkSQL大窗口小步长的滑动窗口解决方案

2023-05-26 文章 tanjialiang
Hi, all.
我在使用FlinkSQL的window tvf滑动窗口时遇到一些问题。
滑动步长为5分钟,窗口为24小时,group by 
user_id的滑动窗口,当任务挂掉了或者从kafka的earliest-offset消费,checkpoint很难成功。
因为从earliest开始消费,数据很快就会堆满缓冲区产生背压,这时这一批数据可能会触发N次窗口计算往下游发,每次触发的操作成本是(用户基数 * 24 * 
60 / 5),checkpoint barrier可能会一直卡住。
这时候有什么办法可以破局吗?


best, 
tanjialiang.

回复: flink 窗口触发计算的条件

2023-05-25 文章 小昌同学
请教一下老师,您说的【同样数据的话,水印没有推进,窗口就不会触发】是不是意思是发送相同的数据,数据本身携带的时间戳是一样的,达不到水位线触发窗口的标准呀?
还有两个问题想请教一下各位老师:
1、事件时间窗口的闭合是取决于下一条数据所携带的时间戳嘛,只有当下一条数据携带的时间戳大于上一个窗口的endTime,窗口才会触发,如果是这个样子的话,那如果一个最后一个窗口怎么触发啊
2、我想使用stream api去打印出来窗口的起始时间以及结束时间,这个是哪一个api呀


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | lxk |
| 发送日期 | 2023年5月25日 10:14 |
| 收件人 |  |
| 主题 | Re:回复: flink 窗口触发计算的条件 |
你好,可以先看看官方文档中关于事件时间和水印的介绍
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/concepts/time/
如果你发了多条数据,但是都是同样数据的话,水印没有推进,窗口就不会触发



















在 2023-05-25 10:00:36,"小昌同学"  写道:
是的 我发送了很多数据,发现窗口还是没有触发


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | yidan zhao |
| 发送日期 | 2023年5月25日 09:59 |
| 收件人 |  |
| 主题 | Re: flink 窗口触发计算的条件 |
如果你只发送了一条数据,那么watermark不会推进,就不会触发窗口计算。你需要更多数据。

小昌同学  于2023年5月25日周四 09:32写道:

各位老师,请教一下关于flink 事件时间窗口的执行时间点的相关问题;
我使用的窗口是:TumblingEventTimeWindows(Time.minutes(1L)),我使用的时间定义是System.currentTimeMillis(),watermark是2秒,
但是当我发送一条数据后,过了5分钟之后,窗口都没有触发计算,想请各位老师帮忙看一下程序的问题所在:
相关代码以及样例数据如下:
|
package job;
import bean.MidInfo3;
import bean.Result;
import bean2.BaseInfo2;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import 
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import 
org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import utils.DateUtil;
import utils.JdbcUtil;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.time.Duration;
import java.util.HashMap;
import java.util.Properties;

public class RytLogAnly9 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
//1、消费Kafka中的数据
String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers");
String topicName = FlinkConfig.config.getProperty("dev_topicName");
String groupId = FlinkConfig.config.getProperty("dev_groupId");
String devMode = FlinkConfig.config.getProperty("dev_mode");
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", servers);
prop.setProperty("group.id", groupId);
prop.setProperty("auto.offset.reset", devMode);
DataStreamSource sourceStream = env.addSource(new 
FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop));
sourceStream.print("最源端的数据sourceStream");

//2、对原始数据进行处理,获取到自己需要的数据,生成BaseInfo2基类数据
SingleOutputStreamOperator baseInfoStream = sourceStream.map(new 
MapFunction() {
@Override
public BaseInfo2 map(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
//获取到不同的服务器IP
String serverIp = jsonObject.getString("ip");
//获取到不同的data的数据
String datas = jsonObject.getString("data");

String[] splits = datas.split("\n");
HashMap dataMap = new HashMap<>();
//将time填充到自定义类型中,用来判断同一个num的请求以及应答时间
String time = splits[0].substring(7, 19).replace("-", "").trim();
//将subData填充到自定义类型中,用来判断时请求还是应答
String subData = datas.substring(0, 10);
for (int i = 0; i < splits.length; i++) {
if (splits[i].contains("=")) {
splits[i] = splits[i].replaceFirst("=", "&");
String[] temp = splits[i].split("&");
if (temp.length > 1) {
dataMap.put(temp[0].toLowerCase(), temp[1]);
}
}
}
return new BaseInfo2(dataMap.get("action"), "需要去MySQL中查找对应的功能描述", serverIp, 
DateUtil.string2Long(time), dataMap.get("handleserialno"), subData, 
System.currentTimeMillis());
}

Re:回复: flink 窗口触发计算的条件

2023-05-24 文章 lxk
你好,可以先看看官方文档中关于事件时间和水印的介绍
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/concepts/time/
如果你发了多条数据,但是都是同样数据的话,水印没有推进,窗口就不会触发



















在 2023-05-25 10:00:36,"小昌同学"  写道:
>是的 我发送了很多数据,发现窗口还是没有触发
>
>
>| |
>小昌同学
>|
>|
>ccc0606fight...@163.com
>|
> 回复的原邮件 
>| 发件人 | yidan zhao |
>| 发送日期 | 2023年5月25日 09:59 |
>| 收件人 |  |
>| 主题 | Re: flink 窗口触发计算的条件 |
>如果你只发送了一条数据,那么watermark不会推进,就不会触发窗口计算。你需要更多数据。
>
>小昌同学  于2023年5月25日周四 09:32写道:
>
>各位老师,请教一下关于flink 事件时间窗口的执行时间点的相关问题;
>我使用的窗口是:TumblingEventTimeWindows(Time.minutes(1L)),我使用的时间定义是System.currentTimeMillis(),watermark是2秒,
>但是当我发送一条数据后,过了5分钟之后,窗口都没有触发计算,想请各位老师帮忙看一下程序的问题所在:
>相关代码以及样例数据如下:
>|
>package job;
>import bean.MidInfo3;
>import bean.Result;
>import bean2.BaseInfo2;
>import com.alibaba.fastjson.JSON;
>import com.alibaba.fastjson.JSONObject;
>import config.FlinkConfig;
>import org.apache.flink.api.common.eventtime.WatermarkStrategy;
>import org.apache.flink.api.common.functions.FilterFunction;
>import org.apache.flink.api.common.functions.JoinFunction;
>import org.apache.flink.api.common.functions.MapFunction;
>import org.apache.flink.api.common.serialization.SimpleStringSchema;
>import org.apache.flink.api.common.state.StateTtlConfig;
>import org.apache.flink.api.common.state.ValueState;
>import org.apache.flink.api.common.state.ValueStateDescriptor;
>import org.apache.flink.api.java.functions.KeySelector;
>import org.apache.flink.configuration.Configuration;
>import org.apache.flink.streaming.api.datastream.ConnectedStreams;
>import org.apache.flink.streaming.api.datastream.DataStream;
>import org.apache.flink.streaming.api.datastream.DataStreamSource;
>import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.streaming.api.functions.ProcessFunction;
>import org.apache.flink.streaming.api.functions.co.CoMapFunction;
>import 
>org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
>import 
>org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
>import 
>org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
>import org.apache.flink.streaming.api.windowing.time.Time;
>import 
>org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
>import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
>import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
>import org.apache.flink.util.Collector;
>import utils.DateUtil;
>import utils.JdbcUtil;
>
>import java.sql.Connection;
>import java.sql.PreparedStatement;
>import java.sql.ResultSet;
>import java.time.Duration;
>import java.util.HashMap;
>import java.util.Properties;
>
>public class RytLogAnly9 {
>public static void main(String[] args) throws Exception {
>StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>env.disableOperatorChaining();
>//1、消费Kafka中的数据
>String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers");
>String topicName = FlinkConfig.config.getProperty("dev_topicName");
>String groupId = FlinkConfig.config.getProperty("dev_groupId");
>String devMode = FlinkConfig.config.getProperty("dev_mode");
>Properties prop = new Properties();
>prop.setProperty("bootstrap.servers", servers);
>prop.setProperty("group.id", groupId);
>prop.setProperty("auto.offset.reset", devMode);
>DataStreamSource sourceStream = env.addSource(new 
>FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop));
>sourceStream.print("最源端的数据sourceStream");
>
>//2、对原始数据进行处理,获取到自己需要的数据,生成BaseInfo2基类数据
>SingleOutputStreamOperator baseInfoStream = sourceStream.map(new 
>MapFunction() {
>@Override
>public BaseInfo2 map(String value) throws Exception {
>JSONObject jsonObject = JSON.parseObject(value);
>//获取到不同的服务器IP
>String serverIp = jsonObject.getString("ip");
>//获取到不同的data的数据
>String datas = jsonObject.getString("data");
>
>String[] splits = datas.split("\n");
>HashMap dataMap = new HashMap<>();
>//将time填充到自定义类型中,用来判断同一个num的请求以及应答时间
>String time = splits[0].substring(7, 19).replace("-", "").trim();
>//将subData填充到自定义类型中,用来判断时请求还是应答
>String subData = datas.substring(0, 10);
>for (int i = 0; i < splits.length; i++) {
>if (splits[i].contains("=")) {
>splits[i] = splits[i].replaceFirst("=", "&");
>String[] temp = splits[i].split("&");
>if (temp.length > 1) {
>dataMap.put(temp[0].toLowerCase(), temp[1]);
>}
>}
>}
>return new BaseInfo2(dataMap.get("action"), "需要去MySQL中查找对应的功能描述", serverIp, 
>DateUtil.string2Long(time), dataMap.get("handleserialno"), subData, 
>System.currentTimeMillis());
>}
>}).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner((element,
> recordTimestamp) -> element.getEvenTime()));
>baseInfoStream.print("不加功能描述的 baseInfoStream");
>
>//3、上述的数据流中的action仅仅是数字,需要关联一下MySQL去拿到对应的功能中文描述
>SingleOutputStreamOperator completeInfoStream = 
>baseInfoStream.map(new 

回复: flink 窗口触发计算的条件

2023-05-24 文章 小昌同学
是的 我发送了很多数据,发现窗口还是没有触发


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | yidan zhao |
| 发送日期 | 2023年5月25日 09:59 |
| 收件人 |  |
| 主题 | Re: flink 窗口触发计算的条件 |
如果你只发送了一条数据,那么watermark不会推进,就不会触发窗口计算。你需要更多数据。

小昌同学  于2023年5月25日周四 09:32写道:

各位老师,请教一下关于flink 事件时间窗口的执行时间点的相关问题;
我使用的窗口是:TumblingEventTimeWindows(Time.minutes(1L)),我使用的时间定义是System.currentTimeMillis(),watermark是2秒,
但是当我发送一条数据后,过了5分钟之后,窗口都没有触发计算,想请各位老师帮忙看一下程序的问题所在:
相关代码以及样例数据如下:
|
package job;
import bean.MidInfo3;
import bean.Result;
import bean2.BaseInfo2;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import 
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import 
org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import utils.DateUtil;
import utils.JdbcUtil;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.time.Duration;
import java.util.HashMap;
import java.util.Properties;

public class RytLogAnly9 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
//1、消费Kafka中的数据
String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers");
String topicName = FlinkConfig.config.getProperty("dev_topicName");
String groupId = FlinkConfig.config.getProperty("dev_groupId");
String devMode = FlinkConfig.config.getProperty("dev_mode");
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", servers);
prop.setProperty("group.id", groupId);
prop.setProperty("auto.offset.reset", devMode);
DataStreamSource sourceStream = env.addSource(new 
FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop));
sourceStream.print("最源端的数据sourceStream");

//2、对原始数据进行处理,获取到自己需要的数据,生成BaseInfo2基类数据
SingleOutputStreamOperator baseInfoStream = sourceStream.map(new 
MapFunction() {
@Override
public BaseInfo2 map(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
//获取到不同的服务器IP
String serverIp = jsonObject.getString("ip");
//获取到不同的data的数据
String datas = jsonObject.getString("data");

String[] splits = datas.split("\n");
HashMap dataMap = new HashMap<>();
//将time填充到自定义类型中,用来判断同一个num的请求以及应答时间
String time = splits[0].substring(7, 19).replace("-", "").trim();
//将subData填充到自定义类型中,用来判断时请求还是应答
String subData = datas.substring(0, 10);
for (int i = 0; i < splits.length; i++) {
if (splits[i].contains("=")) {
splits[i] = splits[i].replaceFirst("=", "&");
String[] temp = splits[i].split("&");
if (temp.length > 1) {
dataMap.put(temp[0].toLowerCase(), temp[1]);
}
}
}
return new BaseInfo2(dataMap.get("action"), "需要去MySQL中查找对应的功能描述", serverIp, 
DateUtil.string2Long(time), dataMap.get("handleserialno"), subData, 
System.currentTimeMillis());
}
}).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner((element,
 recordTimestamp) -> element.getEvenTime()));
baseInfoStream.print("不加功能描述的 baseInfoStream");

//3、上述的数据流中的action仅仅是数字,需要关联一下MySQL去拿到对应的功能中文描述
SingleOutputStreamOperator completeInfoStream = 
baseInfoStream.map(new MapFunction() {
@Override
public BaseInfo2 map(BaseInfo2 value) throws Exception {
//拿到数据中携带的数字的action
String actionId = value.getFuncId();
System.out.println("数据中的action编码是: " + actionId);
String actionName = null;
Connection connection = null;
PreparedStatement ps = null;

//根据数据的action去MySQL中查找到对应的中午注释
try {
String 

Re: flink 窗口触发计算的条件

2023-05-24 文章 yidan zhao
如果你只发送了一条数据,那么watermark不会推进,就不会触发窗口计算。你需要更多数据。

小昌同学  于2023年5月25日周四 09:32写道:
>
> 各位老师,请教一下关于flink 事件时间窗口的执行时间点的相关问题;
> 我使用的窗口是:TumblingEventTimeWindows(Time.minutes(1L)),我使用的时间定义是System.currentTimeMillis(),watermark是2秒,
> 但是当我发送一条数据后,过了5分钟之后,窗口都没有触发计算,想请各位老师帮忙看一下程序的问题所在:
> 相关代码以及样例数据如下:
> |
> package job;
> import bean.MidInfo3;
> import bean.Result;
> import bean2.BaseInfo2;
> import com.alibaba.fastjson.JSON;
> import com.alibaba.fastjson.JSONObject;
> import config.FlinkConfig;
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import org.apache.flink.api.common.functions.FilterFunction;
> import org.apache.flink.api.common.functions.JoinFunction;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.api.common.state.StateTtlConfig;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.java.functions.KeySelector;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.ConnectedStreams;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.ProcessFunction;
> import org.apache.flink.streaming.api.functions.co.CoMapFunction;
> import 
> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
> import 
> org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
> import 
> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import 
> org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> import org.apache.flink.util.Collector;
> import utils.DateUtil;
> import utils.JdbcUtil;
>
> import java.sql.Connection;
> import java.sql.PreparedStatement;
> import java.sql.ResultSet;
> import java.time.Duration;
> import java.util.HashMap;
> import java.util.Properties;
>
> public class RytLogAnly9 {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.disableOperatorChaining();
> //1、消费Kafka中的数据
> String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers");
> String topicName = FlinkConfig.config.getProperty("dev_topicName");
> String groupId = FlinkConfig.config.getProperty("dev_groupId");
> String devMode = FlinkConfig.config.getProperty("dev_mode");
> Properties prop = new Properties();
> prop.setProperty("bootstrap.servers", servers);
> prop.setProperty("group.id", groupId);
> prop.setProperty("auto.offset.reset", devMode);
> DataStreamSource sourceStream = env.addSource(new 
> FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop));
> sourceStream.print("最源端的数据sourceStream");
>
> //2、对原始数据进行处理,获取到自己需要的数据,生成BaseInfo2基类数据
> SingleOutputStreamOperator baseInfoStream = sourceStream.map(new 
> MapFunction() {
> @Override
> public BaseInfo2 map(String value) throws Exception {
> JSONObject jsonObject = JSON.parseObject(value);
> //获取到不同的服务器IP
> String serverIp = jsonObject.getString("ip");
> //获取到不同的data的数据
> String datas = jsonObject.getString("data");
>
> String[] splits = datas.split("\n");
> HashMap dataMap = new HashMap<>();
> //将time填充到自定义类型中,用来判断同一个num的请求以及应答时间
> String time = splits[0].substring(7, 19).replace("-", "").trim();
> //将subData填充到自定义类型中,用来判断时请求还是应答
> String subData = datas.substring(0, 10);
> for (int i = 0; i < splits.length; i++) {
> if (splits[i].contains("=")) {
> splits[i] = splits[i].replaceFirst("=", "&");
> String[] temp = splits[i].split("&");
> if (temp.length > 1) {
> dataMap.put(temp[0].toLowerCase(), temp[1]);
> }
> }
> }
> return new BaseInfo2(dataMap.get("action"), "需要去MySQL中查找对应的功能描述", serverIp, 
> DateUtil.string2Long(time), dataMap.get("handleserialno"), subData, 
> System.currentTimeMillis());
> }
> 
> }).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner((element,
>  recordTimestamp) -> element.getEvenTime()));
> baseInfoStream.print("不加功能描述的 baseInfoStream");
>
> //3、上述的数据流中的action仅仅是数字,需要关联一下MySQL去拿到对应的功能中文描述
> SingleOutputStreamOperator completeInfoStream = 
> 

flink 窗口触发计算的条件

2023-05-24 文章 小昌同学
各位老师,请教一下关于flink 事件时间窗口的执行时间点的相关问题;
我使用的窗口是:TumblingEventTimeWindows(Time.minutes(1L)),我使用的时间定义是System.currentTimeMillis(),watermark是2秒,
但是当我发送一条数据后,过了5分钟之后,窗口都没有触发计算,想请各位老师帮忙看一下程序的问题所在:
相关代码以及样例数据如下:
|
package job;
import bean.MidInfo3;
import bean.Result;
import bean2.BaseInfo2;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import 
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import 
org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import utils.DateUtil;
import utils.JdbcUtil;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.time.Duration;
import java.util.HashMap;
import java.util.Properties;

public class RytLogAnly9 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
//1、消费Kafka中的数据
String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers");
String topicName = FlinkConfig.config.getProperty("dev_topicName");
String groupId = FlinkConfig.config.getProperty("dev_groupId");
String devMode = FlinkConfig.config.getProperty("dev_mode");
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", servers);
prop.setProperty("group.id", groupId);
prop.setProperty("auto.offset.reset", devMode);
DataStreamSource sourceStream = env.addSource(new 
FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop));
sourceStream.print("最源端的数据sourceStream");

//2、对原始数据进行处理,获取到自己需要的数据,生成BaseInfo2基类数据
SingleOutputStreamOperator baseInfoStream = sourceStream.map(new 
MapFunction() {
@Override
public BaseInfo2 map(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
//获取到不同的服务器IP
String serverIp = jsonObject.getString("ip");
//获取到不同的data的数据
String datas = jsonObject.getString("data");

String[] splits = datas.split("\n");
HashMap dataMap = new HashMap<>();
//将time填充到自定义类型中,用来判断同一个num的请求以及应答时间
String time = splits[0].substring(7, 19).replace("-", "").trim();
//将subData填充到自定义类型中,用来判断时请求还是应答
String subData = datas.substring(0, 10);
for (int i = 0; i < splits.length; i++) {
if (splits[i].contains("=")) {
splits[i] = splits[i].replaceFirst("=", "&");
String[] temp = splits[i].split("&");
if (temp.length > 1) {
dataMap.put(temp[0].toLowerCase(), temp[1]);
}
}
}
return new BaseInfo2(dataMap.get("action"), "需要去MySQL中查找对应的功能描述", serverIp, 
DateUtil.string2Long(time), dataMap.get("handleserialno"), subData, 
System.currentTimeMillis());
}

}).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner((element,
 recordTimestamp) -> element.getEvenTime()));
baseInfoStream.print("不加功能描述的 baseInfoStream");

//3、上述的数据流中的action仅仅是数字,需要关联一下MySQL去拿到对应的功能中文描述
SingleOutputStreamOperator completeInfoStream = 
baseInfoStream.map(new MapFunction() {
@Override
public BaseInfo2 map(BaseInfo2 value) throws Exception {
//拿到数据中携带的数字的action
String actionId = value.getFuncId();
System.out.println("数据中的action编码是: " + actionId);
String actionName = null;

关于Table API 或 SQL 如何设置水印的疑问?

2023-05-24 文章 ZhaoShuKang
各位老师好,我最近在做Flink查询Hive的功能,需要用到窗口处理数据,在编写代码过程中无法设置水印,我看官网看到Table API & SQL 
设置事件时间有三种方式:
1、在 DDL 中定义
2、在 DataStream 到 Table 转换时定义
3、使用 TableSource 定义
而我使用的是HiveCatalog查询hive,貌似用不上以上三种方式。所以我想问问各位老师,有没有一种办法可以直接在Table上设置某个字段为事件事件,并且设置水印?
另外说明,我的第一版代码是将Table转换为DataSteam,然后再设置水印和窗口,但是执行转换过程非常耗时,并且在源码中 
toDataSteam()方法的注释上也说“表生态系统的类型系统比DataStream API的类型系统更丰富”,因此开始考虑使用Table或SQL解决问题。
以下是我的第一版代码
// flink 集成 hive
System.out.println("初始化Flink环境");
String hiveVersion = "3.1.2";
String catalogName = "myhive";
String defaultDatabase = "dwd_1580_egd_finishing_mill_lv1_202302";
String hiveConfDir = "/usr/hdp/3.1.4.0-315/apache-hive-3.1.2-bin/conf";
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

System.out.println("定义hive环境");
// 定义 hive catalog 参数:catalog名称、数据库名称、对象名称
HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir, 
hiveVersion);
tableEnv.registerCatalog(catalogName, hive);

// 将 HiveCatalog 设置为 session 的当前 catalog
tableEnv.useCatalog(catalogName);
tableEnv.useDatabase(defaultDatabase);
// 设置 hive 并行度
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setInteger("table.exec.hive.infer-source-parallelism.max", 
sourceParallelism); // Default 1000

// 使用 HiveTableSource
System.out.println("定义查询条件");
// 定义查询条件
Table table = tableEnv
.from(catalogName + "." + databaseName + "." + tableName)
.select(DATETIME + "," + fields + "," + YEAR + "," + MONTH + "," + DAY 
+ "," + HOUR)
.filter($(YEAR).isEqual(year))
.filter($(MONTH).isEqual(startMonth))
.filter($(DAY).isGreaterOrEqual(startDay))
.filter($(HOUR).isGreaterOrEqual(startHour))
.filter($(DAY).isLessOrEqual(endDay))
.filter($(HOUR).isLessOrEqual(endHour));
tableEnv.createTemporaryView("myTable", table);

// Table 转 Stream,非常耗时
System.out.println("Table to Stream");
DataStream resultStream = tableEnv.toDataStream(table);
// 水印及窗口设置
System.out.println("水印及窗口");
resultStream
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((SerializableTimestampAssigner) 
(element, recordTimestamp) -> {
long datetime = 0;
try {
datetime = new SimpleDateFormat(DATEFORMAT)
.parse(element.getFieldAs(DATETIME).toString())
.getTime();
} catch (ParseException e) {
e.printStackTrace();
}
return datetime;
}))
.windowAll(TumblingEventTimeWindows.of(Time.seconds(windowTime)))
| |
ZhaoShuKang
|
|
chuckzha...@163.com
|

Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.5.0 released

2023-05-24 文章 huang huang
退订

Maximilian Michels  于2023年5月23日周二 10:12写道:

> Niceee. Thanks for managing the release, Gyula!
>
> -Max
>
> On Wed, May 17, 2023 at 8:25 PM Márton Balassi 
> wrote:
> >
> > Thanks, awesome! :-)
> >
> > On Wed, May 17, 2023 at 2:24 PM Gyula Fóra  wrote:
> >>
> >> The Apache Flink community is very happy to announce the release of
> Apache Flink Kubernetes Operator 1.5.0.
> >>
> >> The Flink Kubernetes Operator allows users to manage their Apache Flink
> applications and their lifecycle through native k8s tooling like kubectl.
> >>
> >> Release highlights:
> >>  - Autoscaler improvements
> >>  - Operator stability, observability improvements
> >>
> >> Release blogpost:
> >>
> https://flink.apache.org/2023/05/17/apache-flink-kubernetes-operator-1.5.0-release-announcement/
> >>
> >> The release is available for download at:
> https://flink.apache.org/downloads.html
> >>
> >> Maven artifacts for Flink Kubernetes Operator can be found at:
> https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator
> >>
> >> Official Docker image for Flink Kubernetes Operator applications can be
> found at: https://hub.docker.com/r/apache/flink-kubernetes-operator
> >>
> >> The full release notes are available in Jira:
> >> https://issues.apache.org/jira/projects/FLINK/versions/12352931
> >>
> >> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
> >>
> >> Regards,
> >> Gyula Fora
>


退订

2023-05-24 文章 梁猛
退订


| |
梁猛
|
|
cdt...@163.com
|

Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.5.0 released

2023-05-23 文章 Maximilian Michels
Niceee. Thanks for managing the release, Gyula!

-Max

On Wed, May 17, 2023 at 8:25 PM Márton Balassi  wrote:
>
> Thanks, awesome! :-)
>
> On Wed, May 17, 2023 at 2:24 PM Gyula Fóra  wrote:
>>
>> The Apache Flink community is very happy to announce the release of Apache 
>> Flink Kubernetes Operator 1.5.0.
>>
>> The Flink Kubernetes Operator allows users to manage their Apache Flink 
>> applications and their lifecycle through native k8s tooling like kubectl.
>>
>> Release highlights:
>>  - Autoscaler improvements
>>  - Operator stability, observability improvements
>>
>> Release blogpost:
>> https://flink.apache.org/2023/05/17/apache-flink-kubernetes-operator-1.5.0-release-announcement/
>>
>> The release is available for download at: 
>> https://flink.apache.org/downloads.html
>>
>> Maven artifacts for Flink Kubernetes Operator can be found at: 
>> https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator
>>
>> Official Docker image for Flink Kubernetes Operator applications can be 
>> found at: https://hub.docker.com/r/apache/flink-kubernetes-operator
>>
>> The full release notes are available in Jira:
>> https://issues.apache.org/jira/projects/FLINK/versions/12352931
>>
>> We would like to thank all contributors of the Apache Flink community who 
>> made this release possible!
>>
>> Regards,
>> Gyula Fora


??????table api????rowtime??????

2023-05-23 文章 ????????
??
| {"ip":"10.125.8.111","data":": -- 14:28:05.111 -- 
<44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=cccf7olmuqbAABLOgVTU/3lQOcAAAClBQAAAP9ZAACQHACQHAAAdGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"}


{"ip":"10.125.8.139","data":": -- 14:28:05.111 -- 
<44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=cccf7olmuqbAABLOgVTU/3lQOcAAAClBQAAAP9ZAACQHACQHAAAdGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"}
 |


| |

|
|
ccc0606fight...@163.com
|
  
| ?? | L Y<531599...@qq.com.INVALID> |
|  | 2023??5??23?? 01:25 |
| ?? | user-zh |
|  | ??table apirowtime?? |
HI??


DataStream

??????table api????rowtime??????

2023-05-23 文章 ????????
??
1
2
??debug??;
 
??15956076613
|
//??
OutputTag requestStream = new OutputTag("requestStream") {
};
OutputTag answerStream = new OutputTag("answerStream") {
};

SingleOutputStreamOperator tagStream = 
completeInfoStream.process(new MyProcessFunction2(requestStream, answerStream));
DataStream requestDataStream = 
tagStream.getSideOutput(requestStream).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2L))
.withTimestampAssigner((element, recordTimestamp) -> 
element.getEvenTime()));
DataStream answerDataStream = 
tagStream.getSideOutput(answerStream).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2L))
.withTimestampAssigner((element, recordTimestamp) -> 
element.getEvenTime()));
|


| |

|
|
ccc0606fight...@163.com
|
  
| ?? | L Y<531599...@qq.com.INVALID> |
|  | 2023??5??23?? 01:25 |
| ?? | user-zh |
|  | ??table apirowtime?? |
HI??


DataStreampackage job;

import bean.BaseInfo;
import bean.MidInfo;
import bean.OutInfo;
import bean.ResultInfo;
import bean2.BaseInfo2;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import function.MyProcessFunction;
import function.MyProcessFunction2;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.OutputTag;
import sink.Sink2Mysql;
import utils.DateUtil;
import utils.DateUtils;
import utils.JdbcUtil;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Timestamp;
import java.time.*;
import java.util.Date;
import java.util.HashMap;
import java.util.Properties;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @Author 昌
 * @Time 2023/5/10 8:32
 * 使用 flink process方法对一条流进行拆分 
考虑到状态设置的问题,采用渐进式累计窗口进行计算
 * 先将两条流转换后的表先简单的关联在一起,再å…
³è”的时候加上一个事件时间(row_time as cast(CURRENT_TIMESTAMP  AS 
timestamp(3) ))
 * 之后在使用渐进式窗口进行计算
 */
public class RytLogAnly4 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//1、连接测试环境kafka的数据
String servers = 
FlinkConfig.config.getProperty("dev_bootstrap.servers");
String topicName = FlinkConfig.config.getProperty("dev_topicName");
String groupId = FlinkConfig.config.getProperty("dev_groupId");
String devMode = FlinkConfig.config.getProperty("dev_mode");
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", servers);
prop.setProperty("group.id", groupId);
prop.setProperty("auto.offset.reset", devMode);
DataStreamSource sourceStream = env.addSource(new 
FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop));
sourceStream.print("sourceStream");
//{"ip":"10.125.8.141","data":"应答: -- 14:28:05.111 -- 

Re: 提问

2023-05-22 文章 Leonard Xu
(1)可以检查下是不是其他作业或者同步工具使用了对应的server-id
(2) server-id 可以尝试用机器IP+时间戳来生成,这样能尽可能避免冲突

祝好,
雪尽

> On May 22, 2023, at 3:34 PM, 曹明勤  wrote:
> 
> 在我提交的flink-cdc-mysql的任务中,需要flink同步多张表的数据,但是我遇到了server-id重复的问题。我尝试过设置随机数,但是server-id有一定的取值范围,并且随机数还是有可能重复。官方文档建议我将server-id设置为一个范围,比如5400-6400,并且设置flink的并行度。这些我都做了,但是当我同步表的数量较多时,还是会出现server-id重复的问题导致任务提交失败。我需要如何设置才能如何避免这种错误?
> 
> 
> 
> 
> In the Flinks-cdc-mysql task I submitted, flink was required to synchronize 
> data of multiple tables, but I encountered the problem of server-id 
> duplication. I tried to set a random number, but server-id has a range of 
> values, and random numbers can be repeated. The official documentation 
> advised me to set server-id to a range, such as 5400-6400, and set flink's 
> parallelism. I did all of this, but when I synchronized a large number of 
> tables, I still had the problem of server-id duplication, which caused the 
> task submission to fail. What do I need to set up to avoid this error?



提问

2023-05-22 文章 曹明勤
在我提交的flink-cdc-mysql的任务中,需要flink同步多张表的数据,但是我遇到了server-id重复的问题。我尝试过设置随机数,但是server-id有一定的取值范围,并且随机数还是有可能重复。官方文档建议我将server-id设置为一个范围,比如5400-6400,并且设置flink的并行度。这些我都做了,但是当我同步表的数量较多时,还是会出现server-id重复的问题导致任务提交失败。我需要如何设置才能如何避免这种错误?




In the Flinks-cdc-mysql task I submitted, flink was required to synchronize 
data of multiple tables, but I encountered the problem of server-id 
duplication. I tried to set a random number, but server-id has a range of 
values, and random numbers can be repeated. The official documentation advised 
me to set server-id to a range, such as 5400-6400, and set flink's parallelism. 
I did all of this, but when I synchronized a large number of tables, I still 
had the problem of server-id duplication, which caused the task submission to 
fail. What do I need to set up to avoid this error?

??????table api????rowtime??????

2023-05-21 文章 ????????
flink1.14


| |

|
|
ccc0606fight...@163.com
|
  
| ?? | L Y<531599...@qq.com.INVALID> |
|  | 2023??5??20?? 01:10 |
| ?? | user-zh |
|  | ??table apirowtime?? |
HI??
??midStream??midStream
??


SingleOutputStreamOperator

??????table api????rowtime??????

2023-05-21 文章 ????????
??midStream
|
//6 
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//1?? ?? 
??
//tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1L));
//tableEnv.createTemporaryView("tableRequest", outRequestDataStream);
//tableEnv.createTemporaryView("tableAnswer", outAnswerDataStream);
Table tableRequest =tableEnv.fromDataStream(outRequestDataStream, $("funcId"), 
$("serverIp"), $("outTime"), $("handleSerialNo"), $("info"), $("funcIdDesc"), 
$("eventTime").rowtime().as("et"));
//Table tableRequest = tableEnv.fromDataStream(outRequestDataStream, 
Schema.newBuilder()
//.column("funcId", DataTypes.STRING())
//.column("serverIp", DataTypes.STRING())
//.column("outTime", DataTypes.BIGINT())
//.column("handleSerialNo", DataTypes.STRING())
//.column("info", DataTypes.STRING())
//.column("funcIdDesc", DataTypes.STRING())
//.column("eventTime", DataTypes.TIMESTAMP(3))
//.watermark("eventTime", "eventTime - INTERVAL '5' SECOND ")
//.build());
Table tableAnswer =tableEnv.fromDataStream(outAnswerDataStream, $("funcId"), 
$("serverIp"), $("outTime"), $("handleSerialNo"), $("info"), $("funcIdDesc"), 
$("eventTime").rowtime());
//Table tableAnswer = tableEnv.fromDataStream(outAnswerDataStream, 
Schema.newBuilder()
//.column("funcId", DataTypes.STRING())
//.column("serverIp", DataTypes.STRING())
//.column("outTime", DataTypes.BIGINT())
//.column("handleSerialNo", DataTypes.STRING())
//.column("info", DataTypes.STRING())
//.column("funcIdDesc", DataTypes.STRING())
//.column("eventTime", DataTypes.TIMESTAMP(3))
//.watermark("eventTime", "eventTime - INTERVAL '5' SECOND ")
//.build());


Table result = tableEnv.sqlQuery("select \n" +
"\ta.funcId as funcId ,\n" +
"\ta.funcIdDesc as funcIdDesc,\n" +
"\ta.serverIp as serverIp,\n" +
"\tb.outTime as maxTime,\n" +
"\ta.outTime as minTime,\t\n" +
"\tconcat(a.funcId,a.serverIp) as pk ,\n" +
" a.et  as et\n" +
" from " + tableRequest + " a\n " +
" inner join " + tableAnswer + " b" +
" on a.handleSerialNo=b.handleSerialNo ");
System.out.println("??resultTable" + result);
result.printSchema();
tableEnv.createTemporaryView("resultTable", result);

DataStream midStream = tableEnv.toAppendStream(result, 
MidInfo.class);
Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), 
$("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), 
$("et").rowtime())
.select($("funcId"), $("funcIdDesc"), $("serverIp"), 
$("maxTime"), $("minTime"), $("pk"), $("et"));
midTable.printSchema();
tableEnv.createTemporaryView("midTable1", midTable);

//TVF
Table resulTable = tableEnv.sqlQuery("SELECT 
funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +
"FROM TABLE(CUMULATE(\n" +
" TABLE midTable1" +
//" TABLE "+ midTable +
" , DESCRIPTOR(et)\n" +
" , INTERVAL '60' SECOND\n" +
" , INTERVAL '1' DAY))\n" +
" GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk");
resulTable.printSchema();
|



|
package job;

import bean.BaseInfo;
import bean.MidInfo;
import bean.OutInfo;
import bean.ResultInfo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import function.MyProcessFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.OutputTag;
import sink.Sink2Mysql;
import utils.DateUtil;
import utils.DateUtils;
import utils.JdbcUtil;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import 

table.exec.source.cdc-events-duplicate参数问题

2023-05-18 文章 casel.chen
mysql binlog 操作记录发到 kafka topic 中,消息格式是canal json,现通过flink 
sql实时同步写入另一个mysql库。今天发现实时作业抛错说写入mysql时遇到duplicate key error,查了一下发现是kafka 
topic中存在两条相同的消息,即相同主键且都是INSERT操作的消息。请问这种情况有什么办法可以避免作业出错吗?


查了官方文档说要在作业中添加参数 table.exec.source.cdc-events-duplicate 
,相当于是在作业中添加了一个状态算子用于去重,如果这张表不同主键的记录非常多的话,岂不是让其状态很占内存?而作业本身如果配置了状态过期参数,会不会造成无法精准去重?谢谢!


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/canal/#duplicate-change-events

Re: 使用flink sql创建版本视图无法正常使用

2023-05-17 文章 Shammon FY
Hi,

你邮件里的图片无法显示,也没办法看到具体的错误信息

Best,
Shammon FY


On Thu, May 18, 2023 at 10:15 AM arkey w  wrote:

> flink版本:1.14.5
> 在项目使用版本表时,准备使用版本视图,但创建后无法正常使用。后根据官网提供的示例(  Versioned Tables | Apache Flink
> 
> )进行验证也同样无法使用,创建sql如下:
> 创建事实表:
> [image: image.png]
>
> 创建版本视图:
> [image: image.png]
> [image: image.png]
>
>
> Temporal Join的结果出现了报错:
> [image: image.png]
>
> 在desc视图的时候发现视图并没有主键以及事件时间字段,而join的时候也因此报了错。
> 是我操作哪里有问题吗,要如何才能正确使用版本视图?
>
>


使用flink sql创建版本视图无法正常使用

2023-05-17 文章 arkey w
flink版本:1.14.5
在项目使用版本表时,准备使用版本视图,但创建后无法正常使用。后根据官网提供的示例(  Versioned Tables | Apache Flink

)进行验证也同样无法使用,创建sql如下:
创建事实表:
[image: image.png]

创建版本视图:
[image: image.png]
[image: image.png]


Temporal Join的结果出现了报错:
[image: image.png]

在desc视图的时候发现视图并没有主键以及事件时间字段,而join的时候也因此报了错。
是我操作哪里有问题吗,要如何才能正确使用版本视图?


Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.5.0 released

2023-05-17 文章 Márton Balassi
Thanks, awesome! :-)

On Wed, May 17, 2023 at 2:24 PM Gyula Fóra  wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink Kubernetes Operator 1.5.0.
>
> The Flink Kubernetes Operator allows users to manage their Apache Flink
> applications and their lifecycle through native k8s tooling like kubectl.
>
> Release highlights:
>  - Autoscaler improvements
>  - Operator stability, observability improvements
>
> Release blogpost:
>
> https://flink.apache.org/2023/05/17/apache-flink-kubernetes-operator-1.5.0-release-announcement/
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for Flink Kubernetes Operator can be found at:
> https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator
>
> Official Docker image for Flink Kubernetes Operator applications can be
> found at: https://hub.docker.com/r/apache/flink-kubernetes-operator
>
> The full release notes are available in Jira:
> https://issues.apache.org/jira/projects/FLINK/versions/12352931
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> Gyula Fora
>


[ANNOUNCE] Apache Flink Kubernetes Operator 1.5.0 released

2023-05-17 文章 Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.5.0.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.

Release highlights:
 - Autoscaler improvements
 - Operator stability, observability improvements

Release blogpost:
https://flink.apache.org/2023/05/17/apache-flink-kubernetes-operator-1.5.0-release-announcement/

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for Flink Kubernetes Operator applications can be
found at: https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/projects/FLINK/versions/12352931

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gyula Fora


flink sql case when 中文数据写入doris出现乱码

2023-05-17 文章 casel.chen
使用flink sql写mysql表数据到doris表,发现case 
when语句判断交易类型使用了中文,写入后在doris查出是乱码,而mysql其他中文字段写入是正确的,想问一下这个sql中出现的乱码问题要解决?

table api定义rowtime未生效

2023-05-16 文章 小昌同学
各位老师好,以下是我的代码:

| Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), 
$("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), 
$("eventTime").rowtime());
tableEnv.createTemporaryView("midTable1",midTable); Table resulTable = 
tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as 
minTime\n" +"FROM TABLE(CUMULATE(\n" +" TABLE 
midTable1"+//" TABLE "+ midTable +" , 
DESCRIPTOR(eventTime)\n" +" , INTERVAL '60' SECOND\n" + 
   " , INTERVAL '1' DAY))\n" +" GROUP BY 
window_start,window_end,funcId,funcIdDesc,serverIp,pk"); |
我在流转换为表的时候,定义了流中的字段eventTime为rowtime,但是在执行下面的sqlQuery语句的时候,还是报错:Rowtime 
timestamp is not defined. Please make sure that a proper TimestampAssigner is 
defined and the stream environment uses the EventTime time characteristic
想请教一下各位老师解决之法
| |
小昌同学
|
|
ccc0606fight...@163.com
|

回复: 回复:报错显示为bug

2023-05-16 文章 小昌同学
好滴呀  谢谢各位老师


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Shammon FY |
| 发送日期 | 2023年5月16日 08:46 |
| 收件人 |  ,
 |
| 主题 | Re: 回复:报错显示为bug |
Hi,

从错误上看应该是你作业里某个字符串字段被作为时间戳处理,导致作业codegen失败了。你的作业逻辑比较复杂,你可以排查一下跟时间相关的字段,检查一下字段类型处理是否正确,比如eventTime字段

Best,
Shammon FY

On Mon, May 15, 2023 at 7:29 PM lxk  wrote:

你好,从报错来看是类型不兼容导致的。
Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column
103: Cannot cast "java.lang.String" to "java.time.LocalDateTime"
可以尝试对表结构进行优化,或者使用相关函数对字段类型进行转换

















At 2023-05-15 18:29:15, "小昌同学"  wrote:
|
package job;
import bean.BaseInfo;
import bean.MidInfo;
import bean.OutInfo;
import bean.ResultInfo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import function.MyProcessFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.OutputTag;
import sink.Sink2Mysql;
import utils.DateUtil;
import utils.DateUtils;
import utils.JdbcUtil;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.time.*;
import java.util.Date;
import java.util.HashMap;
import java.util.Properties;

public class RytLogAnly4 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//使用侧输出流
OutputTag requestStream = new
OutputTag("requestStream") {
};
OutputTag answerStream = new
OutputTag("answerStream") {
};

//1、连接测试环境kafka的数据
String servers =
FlinkConfig.config.getProperty("dev_bootstrap.servers");
String topicName =
FlinkConfig.config.getProperty("dev_topicName");
String groupId = FlinkConfig.config.getProperty("dev_groupId");
String devMode = FlinkConfig.config.getProperty("dev_mode");
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", servers);
prop.setProperty("group.id", groupId);
prop.setProperty("auto.offset.reset", devMode);
DataStreamSource sourceStream = env.addSource(new
FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop));
//{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 --
<315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBwACABYAAACuAgCuAgAATQFIAAFSMDEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"}

//2、对源数据进行处理,生成baseInfo基类的数据
SingleOutputStreamOperator baseInfoStream =
sourceStream.map(new MapFunction() {
@Override
public BaseInfo map(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
//获取到不同的服务器IP
String serverIp = jsonObject.getString("ip");
//获取到不同的data的数据
String datas = jsonObject.getString("data");

String[] splits = datas.split("\n");
HashMap dataMap = new HashMap<>();
//将time填充到自定义类型中,用来判断同一个num的请求以及相应时间
String time = splits[0].substring(7, 19);
//将subData填充到自定义类型中,用来判断时请求还是应答
String subData = datas.substring(0, 10);
for (int i = 0; i < splits.length; i++) {
if (splits[i].contains("=")) {
splits[i] = splits[i].replaceFirst("=", "&");
String[] temp = splits[i].split("&");
if (temp.length > 1) {
dataMap.put(temp[0].toLowerCase(), temp[1]);
}
}
}
return new BaseInfo(dataMap.get("action"), serverIp,
DateUtil.string2Long(time), dataMap.get("handleserialno"), subData);
}
});

//3、使用process方法进行baseInfoStream流切割
SingleOutputStreamOperator tagStream =
baseInfoStream.process(new MyProcessFunction(requestStream, answerStream));

//4、根据不同的tag进行不同的输出流设定
DataStream requestDataStream =
tagStream.getSideOutput(requestStream);
DataStream answerDataStream =
tagStream.getSideOutput(answerStream);

requestDataStream.print("requestDataStream");

Re: 回复:报错显示为bug

2023-05-15 文章 Shammon FY
Hi,

从错误上看应该是你作业里某个字符串字段被作为时间戳处理,导致作业codegen失败了。你的作业逻辑比较复杂,你可以排查一下跟时间相关的字段,检查一下字段类型处理是否正确,比如eventTime字段

Best,
Shammon FY

On Mon, May 15, 2023 at 7:29 PM lxk  wrote:

> 你好,从报错来看是类型不兼容导致的。
> Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column
> 103: Cannot cast "java.lang.String" to "java.time.LocalDateTime"
> 可以尝试对表结构进行优化,或者使用相关函数对字段类型进行转换
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> At 2023-05-15 18:29:15, "小昌同学"  wrote:
> >|
> >package job;
> >import bean.BaseInfo;
> >import bean.MidInfo;
> >import bean.OutInfo;
> >import bean.ResultInfo;
> >import com.alibaba.fastjson.JSON;
> >import com.alibaba.fastjson.JSONObject;
> >import config.FlinkConfig;
> >import function.MyProcessFunction;
> >import org.apache.flink.api.common.functions.MapFunction;
> >import org.apache.flink.api.common.serialization.SimpleStringSchema;
> >import org.apache.flink.api.java.tuple.Tuple2;
> >import org.apache.flink.streaming.api.TimeCharacteristic;
> >import org.apache.flink.streaming.api.datastream.DataStream;
> >import org.apache.flink.streaming.api.datastream.DataStreamSource;
> >import
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> >import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> >import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> >import org.apache.flink.table.api.DataTypes;
> >import org.apache.flink.table.api.Schema;
> >import org.apache.flink.table.api.Table;
> >import org.apache.flink.table.api.TableSchema;
> >import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> >import org.apache.flink.table.types.DataType;
> >import org.apache.flink.util.OutputTag;
> >import sink.Sink2Mysql;
> >import utils.DateUtil;
> >import utils.DateUtils;
> >import utils.JdbcUtil;
> >
> >import java.sql.Connection;
> >import java.sql.PreparedStatement;
> >import java.sql.ResultSet;
> >import java.time.*;
> >import java.util.Date;
> >import java.util.HashMap;
> >import java.util.Properties;
> >
> >public class RytLogAnly4 {
> >public static void main(String[] args) throws Exception {
> >StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> >env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >
> >//使用侧输出流
> >OutputTag requestStream = new
> OutputTag("requestStream") {
> >};
> >OutputTag answerStream = new
> OutputTag("answerStream") {
> >};
> >
> >//1、连接测试环境kafka的数据
> >String servers =
> FlinkConfig.config.getProperty("dev_bootstrap.servers");
> >String topicName =
> FlinkConfig.config.getProperty("dev_topicName");
> >String groupId = FlinkConfig.config.getProperty("dev_groupId");
> >String devMode = FlinkConfig.config.getProperty("dev_mode");
> >Properties prop = new Properties();
> >prop.setProperty("bootstrap.servers", servers);
> >prop.setProperty("group.id", groupId);
> >prop.setProperty("auto.offset.reset", devMode);
> >DataStreamSource sourceStream = env.addSource(new
> FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop));
> >//{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 --
> <315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBwACABYAAACuAgCuAgAATQFIAAFSMDEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"}
> >
> >//2、对源数据进行处理,生成baseInfo基类的数据
> >SingleOutputStreamOperator baseInfoStream =
> sourceStream.map(new MapFunction() {
> >@Override
> >public BaseInfo map(String value) throws Exception {
> >JSONObject jsonObject = JSON.parseObject(value);
> >//获取到不同的服务器IP
> >String serverIp = jsonObject.getString("ip");
> >//获取到不同的data的数据
> >String datas = jsonObject.getString("data");
> >
> >String[] splits = datas.split("\n");
> >HashMap dataMap = new HashMap<>();
> >//将time填充到自定义类型中,用来判断同一个num的请求以及相应时间
> >String time = splits[0].substring(7, 19);
> >//将subData填充到自定义类型中,用来判断时请求还是应答
> >String subData = datas.substring(0, 10);
> >for (int i = 0; i < splits.length; i++) {
> >if (splits[i].contains("=")) {
> >splits[i] = splits[i].replaceFirst("=", "&");
> >String[] temp = splits[i].split("&");
> >if (temp.length > 1) {
> >dataMap.put(temp[0].toLowerCase(), temp[1]);
> >}
> >}
> >

Re:Re: Flink提交作业是否可以跳过上传作业jar包这一步?

2023-05-15 文章 casel.chen
Application Mode没有这个问题,现在是Session Mode提交作业会遇到这个问题
./bin/flink run -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar




后面这个作业TopSpeedWindowing.jar包可以使用hdfs/oss路径指定吗?如果是分布式文件路径的话是不是就不用上传作业jar包到jobManager了,而是由jobManager自行下载?





在 2023-05-15 19:27:21,"shimin huang"  写道:
>可以考虑基于flink-kubernetes依赖下的KubernetesClusterDescriptor来启动任务,可以参考https://github.com/collabH/flink-deployer/blob/main/infrastructure/src/main/java/com/flink/plugins/inf/deployer/KubernetesClusterDeployer.java
> 
>
>> 2023年5月15日 19:21,casel.chen  写道:
>> 
>> 我们开发了一个实时计算平台提交flink 
>> sql作业到k8s上运行,发现每次提交作业都需要上传平台sql作业jar包flinksql.jar,因为这个jar包包含了平台用到的所有connector和format,所以flinksql.jar这个fat
>>  
>> jar有几百MB,又因为平台所在的k8s集群和作业真正运行的k8s集群是不同的,某些集群的跨k8s离群网络传输开销(跨地区甚至跨云厂商)比较大,而且这个flinksql.jar我们已经放到了k8s镜像当中,jobManager加载镜像后是可以在本地找到该jar的,所以想问一下Flink提交作业是否可以跳过上传作业jar包这一步?有没有参数指定直接去本地加载该作业jar包?这样的话,一是可以减少网络传输开销,二是可以加快flink
>>  sql作业提交的速度。
>


Re:回复:报错显示为bug

2023-05-15 文章 lxk
你好,从报错来看是类型不兼容导致的。
Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column 103: 
Cannot cast "java.lang.String" to "java.time.LocalDateTime"  
可以尝试对表结构进行优化,或者使用相关函数对字段类型进行转换

















At 2023-05-15 18:29:15, "小昌同学"  wrote:
>|
>package job;
>import bean.BaseInfo;
>import bean.MidInfo;
>import bean.OutInfo;
>import bean.ResultInfo;
>import com.alibaba.fastjson.JSON;
>import com.alibaba.fastjson.JSONObject;
>import config.FlinkConfig;
>import function.MyProcessFunction;
>import org.apache.flink.api.common.functions.MapFunction;
>import org.apache.flink.api.common.serialization.SimpleStringSchema;
>import org.apache.flink.api.java.tuple.Tuple2;
>import org.apache.flink.streaming.api.TimeCharacteristic;
>import org.apache.flink.streaming.api.datastream.DataStream;
>import org.apache.flink.streaming.api.datastream.DataStreamSource;
>import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
>import org.apache.flink.table.api.DataTypes;
>import org.apache.flink.table.api.Schema;
>import org.apache.flink.table.api.Table;
>import org.apache.flink.table.api.TableSchema;
>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>import org.apache.flink.table.types.DataType;
>import org.apache.flink.util.OutputTag;
>import sink.Sink2Mysql;
>import utils.DateUtil;
>import utils.DateUtils;
>import utils.JdbcUtil;
>
>import java.sql.Connection;
>import java.sql.PreparedStatement;
>import java.sql.ResultSet;
>import java.time.*;
>import java.util.Date;
>import java.util.HashMap;
>import java.util.Properties;
>
>public class RytLogAnly4 {
>public static void main(String[] args) throws Exception {
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>//使用侧输出流
>OutputTag requestStream = new 
> OutputTag("requestStream") {
>};
>OutputTag answerStream = new 
> OutputTag("answerStream") {
>};
>
>//1、连接测试环境kafka的数据
>String servers = 
> FlinkConfig.config.getProperty("dev_bootstrap.servers");
>String topicName = FlinkConfig.config.getProperty("dev_topicName");
>String groupId = FlinkConfig.config.getProperty("dev_groupId");
>String devMode = FlinkConfig.config.getProperty("dev_mode");
>Properties prop = new Properties();
>prop.setProperty("bootstrap.servers", servers);
>prop.setProperty("group.id", groupId);
>prop.setProperty("auto.offset.reset", devMode);
>DataStreamSource sourceStream = env.addSource(new 
> FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop));
>//{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 -- 
><315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBwACABYAAACuAgCuAgAATQFIAAFSMDEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"}
>
>//2、对源数据进行处理,生成baseInfo基类的数据
>SingleOutputStreamOperator baseInfoStream = 
> sourceStream.map(new MapFunction() {
>@Override
>public BaseInfo map(String value) throws Exception {
>JSONObject jsonObject = JSON.parseObject(value);
>//获取到不同的服务器IP
>String serverIp = jsonObject.getString("ip");
>//获取到不同的data的数据
>String datas = jsonObject.getString("data");
>
>String[] splits = datas.split("\n");
>HashMap dataMap = new HashMap<>();
>//将time填充到自定义类型中,用来判断同一个num的请求以及相应时间
>String time = splits[0].substring(7, 19);
>//将subData填充到自定义类型中,用来判断时请求还是应答
>String subData = datas.substring(0, 10);
>for (int i = 0; i < splits.length; i++) {
>if (splits[i].contains("=")) {
>splits[i] = splits[i].replaceFirst("=", "&");
>String[] temp = splits[i].split("&");
>if (temp.length > 1) {
>dataMap.put(temp[0].toLowerCase(), temp[1]);
>}
>}
>}
>return new BaseInfo(dataMap.get("action"), serverIp, 
>DateUtil.string2Long(time), dataMap.get("handleserialno"), subData);
>}
>});
>
>//3、使用process方法进行baseInfoStream流切割
>SingleOutputStreamOperator tagStream = 
> baseInfoStream.process(new MyProcessFunction(requestStream, answerStream));
>
>//4、根据不同的tag进行不同的输出流设定
>   

Re: Flink提交作业是否可以跳过上传作业jar包这一步?

2023-05-15 文章 shimin huang
可以考虑基于flink-kubernetes依赖下的KubernetesClusterDescriptor来启动任务,可以参考https://github.com/collabH/flink-deployer/blob/main/infrastructure/src/main/java/com/flink/plugins/inf/deployer/KubernetesClusterDeployer.java
 

> 2023年5月15日 19:21,casel.chen  写道:
> 
> 我们开发了一个实时计算平台提交flink 
> sql作业到k8s上运行,发现每次提交作业都需要上传平台sql作业jar包flinksql.jar,因为这个jar包包含了平台用到的所有connector和format,所以flinksql.jar这个fat
>  
> jar有几百MB,又因为平台所在的k8s集群和作业真正运行的k8s集群是不同的,某些集群的跨k8s离群网络传输开销(跨地区甚至跨云厂商)比较大,而且这个flinksql.jar我们已经放到了k8s镜像当中,jobManager加载镜像后是可以在本地找到该jar的,所以想问一下Flink提交作业是否可以跳过上传作业jar包这一步?有没有参数指定直接去本地加载该作业jar包?这样的话,一是可以减少网络传输开销,二是可以加快flink
>  sql作业提交的速度。



Flink提交作业是否可以跳过上传作业jar包这一步?

2023-05-15 文章 casel.chen
我们开发了一个实时计算平台提交flink 
sql作业到k8s上运行,发现每次提交作业都需要上传平台sql作业jar包flinksql.jar,因为这个jar包包含了平台用到的所有connector和format,所以flinksql.jar这个fat
 
jar有几百MB,又因为平台所在的k8s集群和作业真正运行的k8s集群是不同的,某些集群的跨k8s离群网络传输开销(跨地区甚至跨云厂商)比较大,而且这个flinksql.jar我们已经放到了k8s镜像当中,jobManager加载镜像后是可以在本地找到该jar的,所以想问一下Flink提交作业是否可以跳过上传作业jar包这一步?有没有参数指定直接去本地加载该作业jar包?这样的话,一是可以减少网络传输开销,二是可以加快flink
 sql作业提交的速度。

StreamTable Environment initialized failed -- "Could not find any factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath"

2023-05-15 文章 krislee

Hi  ALL,

OS:   CentOS 7.9

Flink version:  1.16.0


It looks like  I'm hitting a  notorious exception which had been 
discoverd since earlier fink version.  The issue was triggered


when below java code executed:

   StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

More detailed trace is as below :

Exception in thread "main" org.apache.flink.table.api.TableException: Could not 
instantiate the executor. Make sure a planner module is on the classpath
at 
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.lookupExecutor(AbstractStreamTableEnvironmentImpl.java:109)
at 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:101)
at 
org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:122)
at 
org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:94)
at 
com.sugon.cloud.paas.flink.cdc.FlinkCDC_mysql2doris_example.main(FlinkCDC_mysql2doris_example.java:63)

Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in 
the classpath.
at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:533)
at 
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.lookupExecutor(AbstractStreamTableEnvironmentImpl.java:106)
... 4 more



What I've done:
1) Added missed dependencies in "pom.xml",  for example:



org.apache.flink

flink-table-api-java-uber

1.16.1

provided






   

   org.apache.flink

   flink-table-planner_${scala.binary.version}

   ${flink.version}

   provided




2)Tried two methods to run application, got same error(see above)
  
   mvn exec:java -Dexec.mainClass="xxx"


   java -jar target/xxx.jar


   I'm confused by the error because all necessary jar files does exist in 
Maven's local repository
or FLINK_HOME's lib dir.


The completed "pom.xml" is included in attachment.


Thanks,
Leo




http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;>
  4.0.0

  com.mycompany.cloud.bigdata.flink
  flink-cdc-doris-example
  1.0-SNAPSHOT
  jar

  
UTF-8
1.8
1.8
1.16.0
2.3.0
2.12

  

  


  org.apache.flink
  flink-java
  ${flink.version}


  org.apache.flink
  flink-streaming-java
  ${flink.version}


  org.apache.flink
  flink-table-api-java-bridge
  ${flink.version}




  com.ververica
  flink-connector-mysql-cdc
  ${flink.connector.version}




  
  org.apache.flink
  flink-table-planner_${scala.binary.version}
  ${flink.version}
  provided




  org.apache.flink
  flink-table-api-java-uber
  ${flink.version}
  provided

  

  

  
org.apache.maven.plugins
maven-compiler-plugin
3.8.1

  ${maven.compiler.source}
  ${maven.compiler.target}

  
  
org.apache.maven.plugins
maven-shade-plugin
3.2.4

  
package

  shade


  



  com.mycompany.cloud.bigdata.flink.cdc.FlinkCDC_mysql2doris_example

  
  

  org.apache.flink:force-shading

  
  
 
*:*

  META-INF/*.SF
  META-INF/*.DSA
  META-INF/*.RSA

 
  

  

  

  
  


回复:报错显示为bug

2023-05-15 文章 小昌同学
|
package job;
import bean.BaseInfo;
import bean.MidInfo;
import bean.OutInfo;
import bean.ResultInfo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import function.MyProcessFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.OutputTag;
import sink.Sink2Mysql;
import utils.DateUtil;
import utils.DateUtils;
import utils.JdbcUtil;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.time.*;
import java.util.Date;
import java.util.HashMap;
import java.util.Properties;

public class RytLogAnly4 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//使用侧输出流
OutputTag requestStream = new 
OutputTag("requestStream") {
};
OutputTag answerStream = new 
OutputTag("answerStream") {
};

//1、连接测试环境kafka的数据
String servers = 
FlinkConfig.config.getProperty("dev_bootstrap.servers");
String topicName = FlinkConfig.config.getProperty("dev_topicName");
String groupId = FlinkConfig.config.getProperty("dev_groupId");
String devMode = FlinkConfig.config.getProperty("dev_mode");
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", servers);
prop.setProperty("group.id", groupId);
prop.setProperty("auto.offset.reset", devMode);
DataStreamSource sourceStream = env.addSource(new 
FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop));
//{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 -- 
<315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBwACABYAAACuAgCuAgAATQFIAAFSMDEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"}

//2、对源数据进行处理,生成baseInfo基类的数据
SingleOutputStreamOperator baseInfoStream = 
sourceStream.map(new MapFunction() {
@Override
public BaseInfo map(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
//获取到不同的服务器IP
String serverIp = jsonObject.getString("ip");
//获取到不同的data的数据
String datas = jsonObject.getString("data");

String[] splits = datas.split("\n");
HashMap dataMap = new HashMap<>();
//将time填充到自定义类型中,用来判断同一个num的请求以及相应时间
String time = splits[0].substring(7, 19);
//将subData填充到自定义类型中,用来判断时请求还是应答
String subData = datas.substring(0, 10);
for (int i = 0; i < splits.length; i++) {
if (splits[i].contains("=")) {
splits[i] = splits[i].replaceFirst("=", "&");
String[] temp = splits[i].split("&");
if (temp.length > 1) {
dataMap.put(temp[0].toLowerCase(), temp[1]);
}
}
}
return new BaseInfo(dataMap.get("action"), serverIp, 
DateUtil.string2Long(time), dataMap.get("handleserialno"), subData);
}
});

//3、使用process方法进行baseInfoStream流切割
SingleOutputStreamOperator tagStream = 
baseInfoStream.process(new MyProcessFunction(requestStream, answerStream));

//4、根据不同的tag进行不同的输出流设定
DataStream requestDataStream = 
tagStream.getSideOutput(requestStream);
DataStream answerDataStream = 
tagStream.getSideOutput(answerStream);

requestDataStream.print("requestDataStream");
answerDataStream.print("answerDataStream");

//5、上面的流仅仅只是携带了action编码,没有对应的action中午注释,需要去关联一下MySQL中的表
//5.1 先对请求流进行处理

Re:报错显示为bug

2023-05-15 文章 lxk
你好,可以把相关代码贴上来吗,方便大家进行分析。如果使用sql的话还可以把执行计划贴上来。

















在 2023-05-15 17:11:42,"小昌同学"  写道:
>各位老师,请教一下我在使用table API进行编程的时候,报错信息为”Caused by: 
>org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
>compiled. This is a bug. Please file an issue. “
>flink使用版本为1.14,请问一下有相关社区的技术人员可以进行对接吗,还是怎么操作
>
>
>| |
>小昌同学
>|
>|
>ccc0606fight...@163.com
>|


报错显示为bug

2023-05-15 文章 小昌同学
各位老师,请教一下我在使用table API进行编程的时候,报错信息为”Caused by: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue. “
flink使用版本为1.14,请问一下有相关社区的技术人员可以进行对接吗,还是怎么操作


| |
小昌同学
|
|
ccc0606fight...@163.com
|

Re:Re: Re: Flink广播流状态清理策略不生效

2023-05-15 文章 lxk



好的,感谢














在 2023-05-15 15:49:12,"Hangxiang Yu"  写道:
>Hi, 可以参考这个 Ticket ,就是讨论要给 Broadcast State 加 TTL 的,当时应该没有继续深入讨论:
>https://issues.apache.org/jira/browse/FLINK-13721
>方便的话你可以在 Ticket 下面也分享下你的使用场景、观察到的现象吗?也可以在 Ticket 下 Vote for this issue.
>我这边也会帮忙一起看下
>
>On Mon, May 15, 2023 at 1:41 PM lxk  wrote:
>
>> 这么看来,广播流好像不适合在生产中使用,状态会无限止的增长。这块官方有计划增加ttl功能吗。
>> 或者使用广播流的时候有没有什么能够手动清理状态的方法?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2023-05-15 11:28:54,"Hangxiang Yu"  写道:
>> >Hi, 目前像 Broadcast state 这种 Operator State 应该是不支持 TTL 设置的,可以参考这里
>> ><
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl
>> >对
>> >State TTL 的描述;
>> >
>> >On Mon, May 15, 2023 at 11:05 AM lxk  wrote:
>> >
>> >> flink版本:1.14
>> >> 目前使用的是对一个数据量比较小的流进行广播,另外的主流跟这个广播流进行匹配处理。
>> >> 在主程序中,我设置了状态过期策略:
>> >>SingleOutputStreamOperator baiduStream =
>> >> env.addSource(adBaiduClick).map(data -> JSON.parseObject(data,
>> >> AdvertiseClick.class)).name("BaiDuAdClick");
>> >> MapStateDescriptor baiduInfoMap = new
>> >> MapStateDescriptor<>("advertiseInfo", String.class,
>> AdvertiseClick.class);
>> >> StateTtlConfig ttlConfig = StateTtlConfig
>> >> .newBuilder(Time.days(7))
>> >> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>> >> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>> >> .cleanupFullSnapshot()
>> >> .cleanupIncrementally(200, true)
>> >> .build();
>> >> baiduInfoMap.enableTimeToLive(ttlConfig);
>> >> 在BroadcastProcessFunction中,我也设置了状态清除策略:
>> >> public void open(Configuration parameters) throws Exception {
>> >> jedisClusterSink = Ad_MyRedisUtil.getJedisClient();
>> >> baiduInfoDesc = new MapStateDescriptor> >> AdvertiseClick>("advertiseInfo", String.class, AdvertiseClick.class);
>> >> StateTtlConfig ttlConfig = StateTtlConfig
>> >> .newBuilder(Time.days(7))
>> >> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>> >> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>> >> .cleanupFullSnapshot()
>> >> .cleanupIncrementally(200, true)
>> >> .build();
>> >> baiduInfoDesc.enableTimeToLive(ttlConfig);
>> >>
>> >> }
>> >> 但是,从目前的checkpoint大小来看,状态清理策略似乎没有生效,程序已经运行了14天,但是整体的checkpoint还是一直在增长。
>> >>
>> >>
>> >> https://pic2.imgdb.cn/item/64619fef0d2dde54d4c6.jpg
>> >>
>> >>
>> >>
>> >>
>> >> 我是用其他状态的过期策略都是生效的,不知道为啥在广播流这里看起来没生效,还是我的使用方式有问题。希望大家能帮忙看看。
>> >
>> >
>> >
>> >--
>> >Best,
>> >Hangxiang.
>>
>
>
>-- 
>Best,
>Hangxiang.


Re: Re: Flink广播流状态清理策略不生效

2023-05-15 文章 Hangxiang Yu
Hi, 可以参考这个 Ticket ,就是讨论要给 Broadcast State 加 TTL 的,当时应该没有继续深入讨论:
https://issues.apache.org/jira/browse/FLINK-13721
方便的话你可以在 Ticket 下面也分享下你的使用场景、观察到的现象吗?也可以在 Ticket 下 Vote for this issue.
我这边也会帮忙一起看下

On Mon, May 15, 2023 at 1:41 PM lxk  wrote:

> 这么看来,广播流好像不适合在生产中使用,状态会无限止的增长。这块官方有计划增加ttl功能吗。
> 或者使用广播流的时候有没有什么能够手动清理状态的方法?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-05-15 11:28:54,"Hangxiang Yu"  写道:
> >Hi, 目前像 Broadcast state 这种 Operator State 应该是不支持 TTL 设置的,可以参考这里
> ><
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl
> >对
> >State TTL 的描述;
> >
> >On Mon, May 15, 2023 at 11:05 AM lxk  wrote:
> >
> >> flink版本:1.14
> >> 目前使用的是对一个数据量比较小的流进行广播,另外的主流跟这个广播流进行匹配处理。
> >> 在主程序中,我设置了状态过期策略:
> >>SingleOutputStreamOperator baiduStream =
> >> env.addSource(adBaiduClick).map(data -> JSON.parseObject(data,
> >> AdvertiseClick.class)).name("BaiDuAdClick");
> >> MapStateDescriptor baiduInfoMap = new
> >> MapStateDescriptor<>("advertiseInfo", String.class,
> AdvertiseClick.class);
> >> StateTtlConfig ttlConfig = StateTtlConfig
> >> .newBuilder(Time.days(7))
> >> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> >> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> >> .cleanupFullSnapshot()
> >> .cleanupIncrementally(200, true)
> >> .build();
> >> baiduInfoMap.enableTimeToLive(ttlConfig);
> >> 在BroadcastProcessFunction中,我也设置了状态清除策略:
> >> public void open(Configuration parameters) throws Exception {
> >> jedisClusterSink = Ad_MyRedisUtil.getJedisClient();
> >> baiduInfoDesc = new MapStateDescriptor >> AdvertiseClick>("advertiseInfo", String.class, AdvertiseClick.class);
> >> StateTtlConfig ttlConfig = StateTtlConfig
> >> .newBuilder(Time.days(7))
> >> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> >> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> >> .cleanupFullSnapshot()
> >> .cleanupIncrementally(200, true)
> >> .build();
> >> baiduInfoDesc.enableTimeToLive(ttlConfig);
> >>
> >> }
> >> 但是,从目前的checkpoint大小来看,状态清理策略似乎没有生效,程序已经运行了14天,但是整体的checkpoint还是一直在增长。
> >>
> >>
> >> https://pic2.imgdb.cn/item/64619fef0d2dde54d4c6.jpg
> >>
> >>
> >>
> >>
> >> 我是用其他状态的过期策略都是生效的,不知道为啥在广播流这里看起来没生效,还是我的使用方式有问题。希望大家能帮忙看看。
> >
> >
> >
> >--
> >Best,
> >Hangxiang.
>


-- 
Best,
Hangxiang.


Re:Re: Flink广播流状态清理策略不生效

2023-05-14 文章 lxk
这么看来,广播流好像不适合在生产中使用,状态会无限止的增长。这块官方有计划增加ttl功能吗。
或者使用广播流的时候有没有什么能够手动清理状态的方法?

















在 2023-05-15 11:28:54,"Hangxiang Yu"  写道:
>Hi, 目前像 Broadcast state 这种 Operator State 应该是不支持 TTL 设置的,可以参考这里
>对
>State TTL 的描述;
>
>On Mon, May 15, 2023 at 11:05 AM lxk  wrote:
>
>> flink版本:1.14
>> 目前使用的是对一个数据量比较小的流进行广播,另外的主流跟这个广播流进行匹配处理。
>> 在主程序中,我设置了状态过期策略:
>>SingleOutputStreamOperator baiduStream =
>> env.addSource(adBaiduClick).map(data -> JSON.parseObject(data,
>> AdvertiseClick.class)).name("BaiDuAdClick");
>> MapStateDescriptor baiduInfoMap = new
>> MapStateDescriptor<>("advertiseInfo", String.class, AdvertiseClick.class);
>> StateTtlConfig ttlConfig = StateTtlConfig
>> .newBuilder(Time.days(7))
>> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>> .cleanupFullSnapshot()
>> .cleanupIncrementally(200, true)
>> .build();
>> baiduInfoMap.enableTimeToLive(ttlConfig);
>> 在BroadcastProcessFunction中,我也设置了状态清除策略:
>> public void open(Configuration parameters) throws Exception {
>> jedisClusterSink = Ad_MyRedisUtil.getJedisClient();
>> baiduInfoDesc = new MapStateDescriptor> AdvertiseClick>("advertiseInfo", String.class, AdvertiseClick.class);
>> StateTtlConfig ttlConfig = StateTtlConfig
>> .newBuilder(Time.days(7))
>> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>> .cleanupFullSnapshot()
>> .cleanupIncrementally(200, true)
>> .build();
>> baiduInfoDesc.enableTimeToLive(ttlConfig);
>>
>> }
>> 但是,从目前的checkpoint大小来看,状态清理策略似乎没有生效,程序已经运行了14天,但是整体的checkpoint还是一直在增长。
>>
>>
>> https://pic2.imgdb.cn/item/64619fef0d2dde54d4c6.jpg
>>
>>
>>
>>
>> 我是用其他状态的过期策略都是生效的,不知道为啥在广播流这里看起来没生效,还是我的使用方式有问题。希望大家能帮忙看看。
>
>
>
>-- 
>Best,
>Hangxiang.


Re: Flink广播流状态清理策略不生效

2023-05-14 文章 Hangxiang Yu
Hi, 目前像 Broadcast state 这种 Operator State 应该是不支持 TTL 设置的,可以参考这里
对
State TTL 的描述;

On Mon, May 15, 2023 at 11:05 AM lxk  wrote:

> flink版本:1.14
> 目前使用的是对一个数据量比较小的流进行广播,另外的主流跟这个广播流进行匹配处理。
> 在主程序中,我设置了状态过期策略:
>SingleOutputStreamOperator baiduStream =
> env.addSource(adBaiduClick).map(data -> JSON.parseObject(data,
> AdvertiseClick.class)).name("BaiDuAdClick");
> MapStateDescriptor baiduInfoMap = new
> MapStateDescriptor<>("advertiseInfo", String.class, AdvertiseClick.class);
> StateTtlConfig ttlConfig = StateTtlConfig
> .newBuilder(Time.days(7))
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> .cleanupFullSnapshot()
> .cleanupIncrementally(200, true)
> .build();
> baiduInfoMap.enableTimeToLive(ttlConfig);
> 在BroadcastProcessFunction中,我也设置了状态清除策略:
> public void open(Configuration parameters) throws Exception {
> jedisClusterSink = Ad_MyRedisUtil.getJedisClient();
> baiduInfoDesc = new MapStateDescriptor AdvertiseClick>("advertiseInfo", String.class, AdvertiseClick.class);
> StateTtlConfig ttlConfig = StateTtlConfig
> .newBuilder(Time.days(7))
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> .cleanupFullSnapshot()
> .cleanupIncrementally(200, true)
> .build();
> baiduInfoDesc.enableTimeToLive(ttlConfig);
>
> }
> 但是,从目前的checkpoint大小来看,状态清理策略似乎没有生效,程序已经运行了14天,但是整体的checkpoint还是一直在增长。
>
>
> https://pic2.imgdb.cn/item/64619fef0d2dde54d4c6.jpg
>
>
>
>
> 我是用其他状态的过期策略都是生效的,不知道为啥在广播流这里看起来没生效,还是我的使用方式有问题。希望大家能帮忙看看。



-- 
Best,
Hangxiang.


Flink广播流状态清理策略不生效

2023-05-14 文章 lxk
flink版本:1.14
目前使用的是对一个数据量比较小的流进行广播,另外的主流跟这个广播流进行匹配处理。
在主程序中,我设置了状态过期策略:
   SingleOutputStreamOperator baiduStream = 
env.addSource(adBaiduClick).map(data -> JSON.parseObject(data, 
AdvertiseClick.class)).name("BaiDuAdClick");
MapStateDescriptor baiduInfoMap = new 
MapStateDescriptor<>("advertiseInfo", String.class, AdvertiseClick.class);
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(7))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupFullSnapshot()
.cleanupIncrementally(200, true)
.build();
baiduInfoMap.enableTimeToLive(ttlConfig);
在BroadcastProcessFunction中,我也设置了状态清除策略:
public void open(Configuration parameters) throws Exception {
jedisClusterSink = Ad_MyRedisUtil.getJedisClient();
baiduInfoDesc = new MapStateDescriptor("advertiseInfo", 
String.class, AdvertiseClick.class);
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(7))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupFullSnapshot()
.cleanupIncrementally(200, true)
.build();
baiduInfoDesc.enableTimeToLive(ttlConfig);

}
但是,从目前的checkpoint大小来看,状态清理策略似乎没有生效,程序已经运行了14天,但是整体的checkpoint还是一直在增长。


https://pic2.imgdb.cn/item/64619fef0d2dde54d4c6.jpg




我是用其他状态的过期策略都是生效的,不知道为啥在广播流这里看起来没生效,还是我的使用方式有问题。希望大家能帮忙看看。

Re: flink 状态设置

2023-05-14 文章 Shammon FY
Hi,

"如果不对于状态进行管理,后续程序会出现问题"是指状态会变得太大?如果是这样,可以在group
by的字段里增加一个天级的时间戳,这样就不会由于key被更新导致的状态过期失效问题

Best,
Shammon FY


On Fri, May 12, 2023 at 1:59 PM 小昌同学  wrote:

> 各位老师好,我这边使用的flink sql是"
> select funcId,funcIdDesc,serverIp,cast(min(maxTime-minTime) as
> varchar(200)) as minTime,pk from
> (
>  select
>   a.funcId as funcId ,
>   a.funcIdDesc as funcIdDesc,
>   a.serverIp as serverIp,
>   b.outTime as maxTime,
>   a.outTime as minTime,
>   concat(a.funcId,a.serverIp) as pk
>  from tableRequest a
>  inner join tableAnswer b
>  on a.handleSerialNo=b.handleSerialNo
> )
> group by funcId,funcIdDesc,serverIp,pk‍‍‍"
>
> 考虑如果不对于状态进行管理,后续程序会出现问题,我这边想实现的状态管理是:我上述的这个sql计算的数据仅仅只是当天(24小时)的,等到第二天就把之前的全部状态全部清除掉,基于这样的场景我可以怎么设置什么参数管理状态,我自己设置参数为“tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1L));”,看官网的解释,感觉这样会有问题,idlestate是只要更新了就会重新设置过期时间,但是我想实现效果是不管是有咩有更新,只要不是属于今天的就全部清理掉。
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |


回复:Flink SQL CEP如何处理双(多)流输入?

2023-05-12 文章 CloudFunny
双流join?
 回复的原邮件 
| 发件人 | casel.chen |
| 发送日期 | 2023年05月12日 11:52 |
| 收件人 | user-zh@flink.apache.org |
| 主题 | Flink SQL CEP如何处理双(多)流输入? |
请问Flink SQL CEP只能处理单流输入吗?网上看到的例子都是在同一个输入流中进行CEP处理,有没有双(多)输入流下使用CEP处理的例子?谢谢!

flink 状态设置

2023-05-11 文章 小昌同学
各位老师好,我这边使用的flink sql是" 
select funcId,funcIdDesc,serverIp,cast(min(maxTime-minTime) as varchar(200)) as 
minTime,pk from
(
 select
  a.funcId as funcId ,
  a.funcIdDesc as funcIdDesc,
  a.serverIp as serverIp,
  b.outTime as maxTime,
  a.outTime as minTime,
  concat(a.funcId,a.serverIp) as pk
 from tableRequest a
 inner join tableAnswer b
 on a.handleSerialNo=b.handleSerialNo
)
group by funcId,funcIdDesc,serverIp,pk‍‍‍"
考虑如果不对于状态进行管理,后续程序会出现问题,我这边想实现的状态管理是:我上述的这个sql计算的数据仅仅只是当天(24小时)的,等到第二天就把之前的全部状态全部清除掉,基于这样的场景我可以怎么设置什么参数管理状态,我自己设置参数为“tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1L));”,看官网的解释,感觉这样会有问题,idlestate是只要更新了就会重新设置过期时间,但是我想实现效果是不管是有咩有更新,只要不是属于今天的就全部清理掉。


| |
小昌同学
|
|
ccc0606fight...@163.com
|

Flink SQL CEP如何处理双(多)流输入?

2023-05-11 文章 casel.chen
请问Flink SQL CEP只能处理单流输入吗?网上看到的例子都是在同一个输入流中进行CEP处理,有没有双(多)输入流下使用CEP处理的例子?谢谢!

Re: flink 1.13 partition.time-extractor.timestamp-pattern 格式

2023-05-10 文章 Shammon FY
Hi,

就像上面文档描述的,如果是多个字段组合成partition,可以在DDL中通过partition.time-
extractor.timestamp-pattern将多个字段按照自己的partition格式需求进行组装。
CREATE TABLE fs_table (
  user_id STRING,
  order_amount DOUBLE,
  dt STRING,
  `hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
  'connector'='filesystem',
  'path'='...',
  'format'='parquet',
  'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00'
);

如果只是一个timestamp字段,想要转换成其他的时间格式,可以参考文档[1]里的例子,新建一个自己的
PartitionTimeExtractor然后通过partition.time-extractor.class指定

在flink-1.15版本及以后[2],已经支持了partition.time-extractor.timestamp-formatter,对timestamp-pattern组装的partition时间戳进行格式转换

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/#partition-time-extractor
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/table/filesystem/#%e5%88%86%e5%8c%ba%e6%97%b6%e9%97%b4%e6%8f%90%e5%8f%96%e5%99%a8

Best,
Shammon FY

On Wed, May 10, 2023 at 5:42 PM 莫失莫忘  wrote:

>
> 我hive的分区格式是 dt='20200520',格式是 flinkSQL 实时任务写hive 只支持 '-mm-dd
> hh:mm:ss' 格式,请问怎么指定  partition.time-extractor.timestamp-pattern 的格式为 'mmdd
> hh:mm:ss' 。flink版本是1.13
>
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/#partition-commit
>
>
>
>
>
>
> --
>
>
>


[ANNOUNCE] Apache flink-connector-gcp-pubsub v3.0.1 released

2023-05-10 文章 Martijn Visser
The Apache Flink community is very happy to announce the release of Apache
flink-connector-gcp-pubsub v3.0.1. This release is compatible with Flink
1.16.x and Flink 1.17.x

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352770

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Release Manager


[ANNOUNCE] Apache flink-connector-elasticsearch v3.0.1 released

2023-05-10 文章 Martijn Visser
The Apache Flink community is very happy to announce the release of Apache
flink-connector-elasticsearch v1.0.1. This release is compatible with Flink
1.16.x and Flink 1.17.x

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352521

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Release Manager


[ANNOUNCE] Apache flink-connector-opensearch v1.0.1 released

2023-05-10 文章 Martijn Visser
The Apache Flink community is very happy to announce the release of Apache
flink-connector-opensearch v1.0.1. This release is compatible with Flink
1.16.x and Flink 1.17.x

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352686

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Release Manager


[ANNOUNCE] Apache flink-connector-pulsar v4.0.0 released

2023-05-10 文章 Martijn Visser
The Apache Flink community is very happy to announce the release of Apache
flink-connector-pulsar v4.0.0. This release is compatible with Flink 1.17.x

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352653

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Release Manager


[ANNOUNCE] Apache flink-shaded v17.0 released

2023-05-10 文章 Martijn Visser
The Apache Flink community is very happy to announce the release of Apache
flink-shaded v17.0.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352445

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Release Manager


[ANNOUNCE] Apache flink-connector-rabbitmq v3.0.1 released

2023-05-10 文章 Martijn Visser
The Apache Flink community is very happy to announce the release of Apache
flink-connector-rabbitmq v3.0.1. This release is compatible with Flink
1.16.x and Flink 1.17.x

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352699

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Release Manager


回复: 不同的流程使用不同的并行度

2023-05-10 文章 小昌同学
好滴呀  谢谢各位老师指导


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | yidan zhao |
| 发送日期 | 2023年4月21日 10:50 |
| 收件人 |  |
| 主题 | Re: 不同的流程使用不同的并行度 |
从哪方面考虑,主要根据每个算子的工作复杂性,复杂性越高自然设置越高的并发好点。 其次实际运行时,也可以根据反压情况找到瓶颈进行调整。

Shammon FY  于2023年4月21日周五 09:04写道:

Hi

DataStream作业设置并发度有两种方式
1. 在ExecutionEnvironment通过setParallelism设置全局并发
2. 在DataStream中通过setParallelism为指定的datastream计算设置并发度

Best,
Shammon FY

On Fri, Apr 21, 2023 at 8:58 AM 小昌同学  wrote:



各位老师好,请教一下关于flink的并行度的问题;
我现在数据上游是kafka(四个分区),经过Flink
ETL处理后,实时落地到Kafka以及MYSQL,那我想在不同的阶段设置不同的并行度,这一块可以怎么使用,我使用的是DataStream API
还想请教一下就是关于并行度的这个设置,应该从哪些方面进行考虑啊,麻烦各位老师指教一下
| |
小昌同学
|
|
ccc0606fight...@163.com
|


Re: 退订

2023-05-10 文章 Hongshun Wang
如果需要取消订阅 user-zh@flink.apache.org 邮件组,请发送任意内容的邮件到
user-zh-unsubscr...@flink.apache.org ,参考[1]

[1] https://flink.apache.org/zh/community/

On Wed, May 10, 2023 at 1:38 AM Zhanshun Zou  wrote:

> 退订
>


flink 1.13 partition.time-extractor.timestamp-pattern ????

2023-05-10 文章 ????????
??hivedt='20200520',?? flinkSQL ??hive 
??'-mm-dd hh:mm:ss' 
??partition.time-extractor.timestamp-pattern 
'mmdd hh:mm:ss'??flink??1.13
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/#partition-commit

















Re: 使用Flink SQL如何实现支付对帐超时告警?

2023-05-10 文章 Hongshun Wang
Hi  casel.chen,
我理解你的意思是:
希望在ThirdPartyPaymentStream一条数据达到的30分钟后,*再触发查询*
,如果此时该数据在PlatformPaymentStream中还未出现,说明超时未支付,则输入到下游。而不是等ThirdPartyPaymentStream数据达到时再判断是否超时,因为此时虽然超时达到,但是也算已支付,没必要再触发报警了。

如果是流计算,可以采用timer定时器延时触发。

对于sql, 我个人的一个比较绕的想法是(供参考,不一定对):是通过Pulsar
Sink(或RocketMQ等有延迟队列的消息中间件)将PlatformPaymentStream的数据写入延迟队列(30分钟)[1],
然后延迟消费为PlatformPaymentStream2。然后将PlatformPaymentStream2 *left join*
ThirdPartyPaymentStream, 如果join后的结果不包含ThirdPartyPaymentStream部分,说明没有及时付款。

[1]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/datastream/pulsar/#%e6%b6%88%e6%81%af%e5%bb%b6%e6%97%b6%e5%8f%91%e9%80%81

Best
Hongshun

On Wed, May 10, 2023 at 8:45 AM Shammon FY  wrote:

> Hi
>
> 如果使用CEP,可以将两个流合并成一个流,然后通过subtype根据不同的事件类型来匹配,定义CEP的Pattern,例如以下这种
> DataStream s1 = ...;
> DataStream s2 = ...;
> DataStream s = s1.union(s1)...;
> Pattern = Pattern.begin("first")
> .subtype(E1.class)
> .where(...)
> .followedBy("second")
> .subtype(E2.class)
> .where(...)
>
> 如果使用Flink SQL,可以直接使用双流Join+窗口实现
>
> Best,
> Shammon FY
>
>
>
>
> On Wed, May 10, 2023 at 2:24 AM casel.chen  wrote:
>
> > 需求:业务端实现支付功能,需要通过第三方支付平台的交易数据采用Flink
> > SQL来做一个实时对账,对于超过30分钟内未到达的第三方支付平台交易数据进行告警。
> > 请问这个双流实时对帐场景使用Flink CEP SQL要如何实现?
> >
> >
> 网上找的例子都是基于单条流实现的,而上述场景会用到两条流,一个是PlatformPaymentStream,另一个是ThirdPartyPaymentStream。
>


Re: 使用Flink SQL如何实现支付对帐超时告警?

2023-05-09 文章 Shammon FY
Hi

如果使用CEP,可以将两个流合并成一个流,然后通过subtype根据不同的事件类型来匹配,定义CEP的Pattern,例如以下这种
DataStream s1 = ...;
DataStream s2 = ...;
DataStream s = s1.union(s1)...;
Pattern = Pattern.begin("first")
.subtype(E1.class)
.where(...)
.followedBy("second")
.subtype(E2.class)
.where(...)

如果使用Flink SQL,可以直接使用双流Join+窗口实现

Best,
Shammon FY




On Wed, May 10, 2023 at 2:24 AM casel.chen  wrote:

> 需求:业务端实现支付功能,需要通过第三方支付平台的交易数据采用Flink
> SQL来做一个实时对账,对于超过30分钟内未到达的第三方支付平台交易数据进行告警。
> 请问这个双流实时对帐场景使用Flink CEP SQL要如何实现?
>
> 网上找的例子都是基于单条流实现的,而上述场景会用到两条流,一个是PlatformPaymentStream,另一个是ThirdPartyPaymentStream。


Re: Failed to initialize delegation token receiver s3

2023-05-09 文章 Hangxiang Yu
Hi,这个应该是FLINK-31839已经确定的ISSUE,在1.17.1中已经修复了,可以参考:
https://issues.apache.org/jira/browse/FLINK-31839

On Sat, May 6, 2023 at 5:00 PM maker_d...@foxmail.com <
maker_d...@foxmail.com> wrote:

> flink version:flink-1.17.0
> k8s application模式模式
>
> 已经在flink-conf中禁用delegation token:
> security.delegation.tokens.enabled: false
>
> 程序原本是1.13版本开发,正常使用,升级flink版本为1.17.0之后无法启动。
> 起初没有禁用delegation token,JobManager无法启动,禁用delegation
> token后JobManager可以正常启动,TaskManager报错如下:
>
> 2023-05-06 16:52:45,720 INFO
> org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository
> [] - Delegation token receiver s3 loaded and initialized
> 2023-05-06 16:52:45,722 INFO
> org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository
> [] - Delegation token receiver s3 loaded and initialized
> 2023-05-06 16:52:45,723 ERROR
> org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository
> [] - Failed to initialize delegation token receiver s3
> java.lang.IllegalStateException: Delegation token receiver with service
> name {} has multiple implementations [s3]
> at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository.lambda$loadReceivers$0(DelegationTokenReceiverRepository.java:75)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at java.util.Iterator.forEachRemaining(Unknown Source) ~[?:?]
> at
> org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository.loadReceivers(DelegationTokenReceiverRepository.java:98)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository.(DelegationTokenReceiverRepository.java:60)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManagerRunnerServices(TaskManagerRunner.java:245)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.start(TaskManagerRunner.java:293)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:486)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerProcessSecurely$5(TaskManagerRunner.java:530)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> [flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:530)
> [flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner.runTaskManagerSecurely(KubernetesTaskExecutorRunner.java:66)
> [flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner.main(KubernetesTaskExecutorRunner.java:46)
> [flink-dist-1.17.0.jar:1.17.0]
> 2023-05-06 16:52:45,729 ERROR
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] -
> Terminating TaskManagerRunner with exit code 1.
> org.apache.flink.util.FlinkException: Failed to start the
> TaskManagerRunner.
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:488)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerProcessSecurely$5(TaskManagerRunner.java:530)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:530)
> [flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner.runTaskManagerSecurely(KubernetesTaskExecutorRunner.java:66)
> [flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner.main(KubernetesTaskExecutorRunner.java:46)
> [flink-dist-1.17.0.jar:1.17.0]
> Caused by: org.apache.flink.util.FlinkRuntimeException:
> java.lang.IllegalStateException: Delegation token receiver with service
> name {} has multiple implementations [s3]
> at
> org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository.lambda$loadReceivers$0(DelegationTokenReceiverRepository.java:93)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at java.util.Iterator.forEachRemaining(Unknown Source) ~[?:?]
> at
> org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository.loadReceivers(DelegationTokenReceiverRepository.java:98)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> 

Re: 退订

2023-05-09 文章 Yuxin Tan
如果需要取消订阅 user-zh@flink.apache.org 邮件组,请发送任意内容的邮件到
user-zh-unsubscr...@flink.apache.org ,参考[1]

[1] https://flink.apache.org/zh/community/

Best,
Yuxin


胡家发 <15802974...@163.com> 于2023年5月7日周日 22:14写道:

> 退订


使用Flink SQL如何实现支付对帐超时告警?

2023-05-09 文章 casel.chen
需求:业务端实现支付功能,需要通过第三方支付平台的交易数据采用Flink SQL来做一个实时对账,对于超过30分钟内未到达的第三方支付平台交易数据进行告警。
请问这个双流实时对帐场景使用Flink CEP SQL要如何实现?
网上找的例子都是基于单条流实现的,而上述场景会用到两条流,一个是PlatformPaymentStream,另一个是ThirdPartyPaymentStream。

退订

2023-05-09 文章 张胜军



退订












The following is the content of the forwarded email
From:"胡家发" <15802974...@163.com>
To:user-zh 
Date:2023-05-07 22:13:55
Subject:退订

退订





Re: 退订

2023-05-09 文章 Hongshun Wang
Please send email to  user-zh-unsubscr...@flink.apache.org
 if you want to unsubscribe the mail from
user-zh-unsubscr...@flink.apache.org , and you can
refer[1][2] for more details.
请发送任意内容的邮件到  user-zh-unsubscr...@flink.apache.org 
地址来取消订阅来自 user-zh@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理邮件订阅。

Best Hongshun,

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2]https://flink.apache.org/community.html#mailing-lists


On Sun, May 7, 2023 at 10:14 PM 胡家发 <15802974...@163.com> wrote:

> 退订


退订

2023-05-09 文章 Zhanshun Zou
退订


Re: 退订

2023-05-07 文章 Junrui Lee
如果需要取消订阅 user-zh@flink.apache.org 邮件组,请发送任意内容的邮件到
user-zh-unsubscr...@flink.apache.org ,参考[1]

[1] https://flink.apache.org/zh/community/

Best,
Junrui

wangwei  于2023年5月7日周日 17:55写道:

> 退订
>
>


退订

2023-05-07 文章 wangwei
退订



flink sql canal json格式侧输出parse error记录问题

2023-05-06 文章 casel.chen
线上使用flink sql消费kafka topic canal json格式数据,发现有一些数据中有的时间字段值为-00-00 
00:00:00无法被解析,于是加了'canal-json.ignore-parse-errors = true' 
参数,作业是能够正常运行了,但同时我们也希望知道哪些数据解析失败以便发给上游业务系统去自查。想问一下除了ignore外,有办法将这些parse 
error数据输出到另外一个kafka topic吗?谢谢!

Re:Re: CheckpointedFunction 与 KeyedState

2023-05-06 文章 sjf0115
谢了
在 2023-05-06 10:36:02,"Hangxiang Yu"  写道:
>Hi, 在 initializeState 里初始化 State 是 OK 的,但是尽量不要在 initializeState 和
>snapshotState 里访问 KeyedState,最好是在实际的 Function 比如这里的 FlatMap 里;
>原因是 KeyedState 的访问是和 Current Key 绑定的,实际的 Function 在 Process 之前会隐式的 set
>Current key ,因此是会保证每次 KeyedState 的操作是对确定的 KV 进行的;
>而 initializeState 和 snapshotState 里是没有框架隐性 set 的,相当于会对某一个不确定的 key 去update
>value了,如果一定要在这里做就需要拿到 KeyContext 自己去 set,不过不建议这么做;
>
>On Fri, May 5, 2023 at 10:58 PM sjf0115  wrote:
>
>> CheckpointedFunction 接口的 initializeState 方法提供了访问
>> FunctionInitializationContext 的能力,而 FunctionInitializationContext 不仅提供了访问
>> OperatorStateStore 的能力,也提供了 KeyedStateStore 的能力。一般常见的是通过
>> CheckpointedFunction 来实现操作 OperatorState,但是也可以通过如下代码来获取 KeyedState:
>> ```java
>> context.getKeyedStateStore().getState(stateDescriptor);
>> ```
>> 想问一下通过 CheckpointedFunction 来实现操作 KeyedState,如下代码所示。建议这样实现吗?会有什么问题吗?
>> ```java
>> public static class TemperatureAlertFlatMapFunction extends
>> RichFlatMapFunction, Tuple3>
>> implements CheckpointedFunction  {
>> // 温度差报警阈值
>> private double threshold;
>> // 上一次温度
>> private ValueState lastTemperatureState;
>> private Double lastTemperature;
>> public TemperatureAlertFlatMapFunction(double threshold) {
>> this.threshold = threshold;
>> }
>>
>>
>> @Override
>> public void flatMap(Tuple2 sensor,
>> Collector> out) throws Exception {
>> String sensorId = sensor.f0;
>> // 当前温度
>> double temperature = sensor.f1;
>> // 保存当前温度
>> lastTemperature = temperature;
>> // 是否是第一次上报的温度
>> if (Objects.equals(lastTemperature, null)) {
>> return;
>> }
>> double diff = Math.abs(temperature - lastTemperature);
>> if (diff > threshold) {
>> // 温度变化超过阈值则输出
>> out.collect(Tuple3.of(sensorId, temperature, diff));
>> }
>> }
>>
>>
>> @Override
>> public void snapshotState(FunctionSnapshotContext context) throws
>> Exception {
>> // 获取最新的温度之后更新保存上一次温度的状态
>> if (!Objects.equals(lastTemperature, null)) {
>> lastTemperatureState.update(lastTemperature);
>> }
>> }
>>
>>
>> @Override
>> public void initializeState(FunctionInitializationContext context)
>> throws Exception {
>> ValueStateDescriptor stateDescriptor = new
>> ValueStateDescriptor<>("lastTemperature", Double.class);
>> lastTemperatureState =
>> context.getKeyedStateStore().getState(stateDescriptor);
>> if (context.isRestored()) {
>> lastTemperature = lastTemperatureState.value();
>> }
>> }
>> }
>> ```
>>
>>
>
>-- 
>Best,
>Hangxiang.


Re: 退订

2023-05-06 文章 Hongshun Wang
Please send email to  user-zh-unsubscr...@flink.apache.org
 if you want to unsubscribe the mail from
user-zh-unsubscr...@flink.apache.org , and you can
refer[1][2] for more details.
请发送任意内容的邮件到  user-zh-unsubscr...@flink.apache.org 
地址来取消订阅来自 user-zh@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理邮件订阅。

Best Hongshun,

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2]https://flink.apache.org/community.html#mailing-lists

On Fri, Apr 21, 2023 at 10:50 AM 杨光跃  wrote:

>
>
> 退订
> | |
> 杨光跃
> |
> |
> yangguangyuem...@163.com
> |
>
>


Re: streaming.api.operators和streaming.runtime.operators的区别是啥?

2023-05-06 文章 Hongshun Wang
我来谈一下我个人的看法,streaming.api.operators是提供给用户使用的stream api,
用户可以使用和扩展该接口。而streaming.runtime.operators是用户侧不感知,在执行时由flink自动调用的。比如:
Sink用户可以自己设置,如kafkaSink。但是输出时的state处理和事务commit(CommitterOperator)是Flink根据不同类型的Sink自动生成的统一逻辑,用户无需自己设置和实现。

Best
Hongshun

On Sat, May 6, 2023 at 11:57 AM yidan zhao  wrote:

> 如题,想知道这个分类的标准是啥呢?
>


退订

2023-05-05 文章 willluzheng



退订

2023-05-05 文章 willluzheng
退订

streaming.api.operators和streaming.runtime.operators的区别是啥?

2023-05-05 文章 yidan zhao
如题,想知道这个分类的标准是啥呢?


Re: CheckpointedFunction 与 KeyedState

2023-05-05 文章 Hangxiang Yu
Hi, 在 initializeState 里初始化 State 是 OK 的,但是尽量不要在 initializeState 和
snapshotState 里访问 KeyedState,最好是在实际的 Function 比如这里的 FlatMap 里;
原因是 KeyedState 的访问是和 Current Key 绑定的,实际的 Function 在 Process 之前会隐式的 set
Current key ,因此是会保证每次 KeyedState 的操作是对确定的 KV 进行的;
而 initializeState 和 snapshotState 里是没有框架隐性 set 的,相当于会对某一个不确定的 key 去update
value了,如果一定要在这里做就需要拿到 KeyContext 自己去 set,不过不建议这么做;

On Fri, May 5, 2023 at 10:58 PM sjf0115  wrote:

> CheckpointedFunction 接口的 initializeState 方法提供了访问
> FunctionInitializationContext 的能力,而 FunctionInitializationContext 不仅提供了访问
> OperatorStateStore 的能力,也提供了 KeyedStateStore 的能力。一般常见的是通过
> CheckpointedFunction 来实现操作 OperatorState,但是也可以通过如下代码来获取 KeyedState:
> ```java
> context.getKeyedStateStore().getState(stateDescriptor);
> ```
> 想问一下通过 CheckpointedFunction 来实现操作 KeyedState,如下代码所示。建议这样实现吗?会有什么问题吗?
> ```java
> public static class TemperatureAlertFlatMapFunction extends
> RichFlatMapFunction, Tuple3>
> implements CheckpointedFunction  {
> // 温度差报警阈值
> private double threshold;
> // 上一次温度
> private ValueState lastTemperatureState;
> private Double lastTemperature;
> public TemperatureAlertFlatMapFunction(double threshold) {
> this.threshold = threshold;
> }
>
>
> @Override
> public void flatMap(Tuple2 sensor,
> Collector> out) throws Exception {
> String sensorId = sensor.f0;
> // 当前温度
> double temperature = sensor.f1;
> // 保存当前温度
> lastTemperature = temperature;
> // 是否是第一次上报的温度
> if (Objects.equals(lastTemperature, null)) {
> return;
> }
> double diff = Math.abs(temperature - lastTemperature);
> if (diff > threshold) {
> // 温度变化超过阈值则输出
> out.collect(Tuple3.of(sensorId, temperature, diff));
> }
> }
>
>
> @Override
> public void snapshotState(FunctionSnapshotContext context) throws
> Exception {
> // 获取最新的温度之后更新保存上一次温度的状态
> if (!Objects.equals(lastTemperature, null)) {
> lastTemperatureState.update(lastTemperature);
> }
> }
>
>
> @Override
> public void initializeState(FunctionInitializationContext context)
> throws Exception {
> ValueStateDescriptor stateDescriptor = new
> ValueStateDescriptor<>("lastTemperature", Double.class);
> lastTemperatureState =
> context.getKeyedStateStore().getState(stateDescriptor);
> if (context.isRestored()) {
> lastTemperature = lastTemperatureState.value();
> }
> }
> }
> ```
>
>

-- 
Best,
Hangxiang.


CheckpointedFunction 与 KeyedState

2023-05-05 文章 sjf0115
CheckpointedFunction 接口的 initializeState 方法提供了访问 FunctionInitializationContext 
的能力,而 FunctionInitializationContext 不仅提供了访问 OperatorStateStore 的能力,也提供了 
KeyedStateStore 的能力。一般常见的是通过 CheckpointedFunction 来实现操作 
OperatorState,但是也可以通过如下代码来获取 KeyedState:
```java
context.getKeyedStateStore().getState(stateDescriptor);
```
想问一下通过 CheckpointedFunction 来实现操作 KeyedState,如下代码所示。建议这样实现吗?会有什么问题吗?
```java
public static class TemperatureAlertFlatMapFunction extends 
RichFlatMapFunction, Tuple3> 
implements CheckpointedFunction  {
// 温度差报警阈值
private double threshold;
// 上一次温度
private ValueState lastTemperatureState;
private Double lastTemperature;
public TemperatureAlertFlatMapFunction(double threshold) {
this.threshold = threshold;
}


@Override
public void flatMap(Tuple2 sensor, Collector> out) throws Exception {
String sensorId = sensor.f0;
// 当前温度
double temperature = sensor.f1;
// 保存当前温度
lastTemperature = temperature;
// 是否是第一次上报的温度
if (Objects.equals(lastTemperature, null)) {
return;
}
double diff = Math.abs(temperature - lastTemperature);
if (diff > threshold) {
// 温度变化超过阈值则输出
out.collect(Tuple3.of(sensorId, temperature, diff));
}
}


@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception 
{
// 获取最新的温度之后更新保存上一次温度的状态
if (!Objects.equals(lastTemperature, null)) {
lastTemperatureState.update(lastTemperature);
}
}


@Override
public void initializeState(FunctionInitializationContext context) throws 
Exception {
ValueStateDescriptor stateDescriptor = new 
ValueStateDescriptor<>("lastTemperature", Double.class);
lastTemperatureState = 
context.getKeyedStateStore().getState(stateDescriptor);
if (context.isRestored()) {
lastTemperature = lastTemperatureState.value();
}
}
}
```



退订

2023-05-05 文章 willluzheng
退订

退订

2023-05-05 文章 JohnLiu




刘晶 John Liu  DBA  
藤蔓技术部  Tenvine Technical Department
深圳市南山区高新科技园科技中三路科兴科学园B4-712 邮编 518052
Mobile: 18820970747  



Re: flink issue可以登录,但是flink中文邮箱账号密码错误,是出现什么原因了嘛

2023-05-05 文章 Hongshun Wang
>
>  flink issue可以登录

这个是jira账号吗?

flink中文邮箱账号密码

什么是flink中文邮箱账号 ?有无登陆页面链接

On Wed, Apr 19, 2023 at 11:36 AM kcz <573693...@qq.com.invalid> wrote:

> 请帮忙看看是我哪里出问题了嘛?我的账号是kcz。我想咨询大佬flink avro的问题
>
>
>
>
> kcz
> 573693...@qq.com
>
>
>
> 


<    3   4   5   6   7   8   9   10   11   12   >