flink jdbcsink 连接数的问题

2023-05-29 文章 小昌同学
各位老师,请教一下关于flink jdbcsink 连接数的问题;
我使用代码如下:在以下代码中,我查看了一下源码,并没有找到sink到MySQL时关于连接数的设定,请问这一块关于连接数的设定我应该怎么写呀;
谢谢各位老师的指导

|
outPutInfoStream.addSink(JdbcSink.sink(
"REPLACE  into InputInfo (breakCode, breakName, breakDuration, 
breakRule,breakPrimaryKey,breakStep,breakStepType,breakTime,breakSendTime,breakArgs)
 values (?,?,?,?,?,?,?,?,?,?)",
(statement, InPutInfo) -> {
statement.setString(1,InPutInfo.getBreakCode());
statement.setString(2,InPutInfo.getBreakName());
statement.setLong(3,InPutInfo.getBreakDuration());
statement.setString(4,InPutInfo.getBreakRule());
statement.setString(5,InPutInfo.getBreakPrimaryKey());
statement.setString(6, InPutInfo.getBreakStep());
statement.setString(7, InPutInfo.getBreakStepType());
statement.setString(8,InPutInfo.getBreakTime());
statement.setString(9, DateUtil.format(new Date()));
statement.setString(10, 
String.valueOf(InPutInfo.getBreakArgs()));
},
JdbcExecutionOptions.builder()
.withBatchSize(10)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()

.withUrl("jdbc:mysql://111/iap?useSSL=false=false=false=utf-8=convertToNull=true=Asia/Shanghai=true")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("111")
.withPassword("111")
.build()
)).name("sink-mysql");
|


| |
小昌同学
|
|
ccc0606fight...@163.com
|

回复: flink 输出异常数据

2023-05-29 文章 小昌同学
你好,数据源是kafka,使用的是stream api


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Weihua Hu |
| 发送日期 | 2023年5月29日 15:29 |
| 收件人 |  |
| 主题 | Re: flink 输出异常数据 |
Hi,

你使用的数据源是什么呢?Kafka 吗?用的是 FlinkSQL 还是 DataStream API 呢?

方便把异常栈贴一下吗

Best,
Weihua


On Mon, May 29, 2023 at 1:36 PM 小昌同学  wrote:


各位老师,我有一个作业运行很久了,但是最近源系统有一些脏数据导致作业运行失败,看yarn的日志报错是空指针,但是我现在想把那一条脏数据捕获到,请问一下有啥办法吗?谢谢各位老师的指导


| |
小昌同学
|
|
ccc0606fight...@163.com
|


Re: [ANNOUNCE] Apache Flink 1.16.2 released

2023-05-29 文章 Jing Ge
Hi Weijie,

Thanks for your contribution and feedback! In case there are some reasons
not to allow us to upgrade them, we still can leverage virtualenv or pipenv
to create a dedicated environment for Flink release. WDYT?

cc Dian Fu

@Dian
I was wondering if you know the reason. Thanks!

Best regards,
Jing




On Mon, May 29, 2023 at 6:27 AM weijie guo 
wrote:

> Hi Jing,
>
> Thank you for caring about the releasing process. It has to be said that
> the entire process went smoothly. We have very comprehensive
> documentation[1] to guide my work, thanks to the contribution of previous
> release managers and the community.
>
> Regarding the obstacles, I actually only have one minor problem: We used an
> older twine(1.12.0) to deploy python artifacts to PyPI, and its compatible
> dependencies (such as urllib3) are also older. When I tried twine upload,
> the process couldn't work as expected as the version of urllib3 installed
> in my machine was relatively higher. In order to solve this, I had to
> proactively downgrade the version of some dependencies. I added a notice in
> the cwiki page[1] to prevent future release managers from encountering the
> same problem. It seems that this is a known issue(see comments in [2]),
> which has been resolved in the higher version of twine, I wonder if we can
> upgrade the version of twine? Does anyone remember the reason why we fixed
> a very old version(1.12.0)?
>
> Best regards,
>
> Weijie
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release
>
> [2] https://github.com/pypa/twine/issues/997
>
>
> Jing Ge  于2023年5月27日周六 00:15写道:
>
> > Hi Weijie,
> >
> > Thanks again for your effort. I was wondering if there were any obstacles
> > you had to overcome while releasing 1.16.2 and 1.17.1 that could lead us
> to
> > any improvement wrt the release process and management?
> >
> > Best regards,
> > Jing
> >
> > On Fri, May 26, 2023 at 4:41 PM Martijn Visser  >
> > wrote:
> >
> > > Thank you Weijie and those who helped with testing!
> > >
> > > On Fri, May 26, 2023 at 1:06 PM weijie guo 
> > > wrote:
> > >
> > > > The Apache Flink community is very happy to announce the release of
> > > > Apache Flink 1.16.2, which is the second bugfix release for the
> Apache
> > > > Flink 1.16 series.
> > > >
> > > >
> > > >
> > > > Apache Flink® is an open-source stream processing framework for
> > > > distributed, high-performing, always-available, and accurate data
> > > > streaming applications.
> > > >
> > > >
> > > >
> > > > The release is available for download at:
> > > >
> > > > https://flink.apache.org/downloads.html
> > > >
> > > >
> > > >
> > > > Please check out the release blog post for an overview of the
> > > > improvements for this bugfix release:
> > > >
> > > > https://flink.apache.org/news/2023/05/25/release-1.16.2.html
> > > >
> > > >
> > > >
> > > > The full release notes are available in Jira:
> > > >
> > > >
> > > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352765
> > > >
> > > >
> > > >
> > > > We would like to thank all contributors of the Apache Flink community
> > > > who made this release possible!
> > > >
> > > >
> > > >
> > > > Feel free to reach out to the release managers (or respond to this
> > > > thread) with feedback on the release process. Our goal is to
> > > > constantly improve the release process. Feedback on what could be
> > > > improved or things that didn't go so well are appreciated.
> > > >
> > > >
> > > >
> > > > Regards,
> > > >
> > > > Release Manager
> > > >
> > >
> >
>


Re: flink 输出异常数据

2023-05-29 文章 Weihua Hu
Hi,

你使用的数据源是什么呢?Kafka 吗?用的是 FlinkSQL 还是 DataStream API 呢?

方便把异常栈贴一下吗

Best,
Weihua


On Mon, May 29, 2023 at 1:36 PM 小昌同学  wrote:

>
> 各位老师,我有一个作业运行很久了,但是最近源系统有一些脏数据导致作业运行失败,看yarn的日志报错是空指针,但是我现在想把那一条脏数据捕获到,请问一下有啥办法吗?谢谢各位老师的指导
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |


自定义trigger触发问题

2023-05-29 文章 吴先生
自定义的trigger,实现满足maxcount或者到达窗口结束时间时输出结果;
问题:同一个窗口,在代码窗口结束时onProcessingTime会触发多次,理论上每个爽口只应该在到达窗口结束时间触发一次,是什么原因
主类代码片段:
SingleOutputStreamOperator> windowMap =

afterMap.timeWindowAll(Time.seconds(5))
.trigger(new CountAndProcessingTimeTrigger(
100))
.process(simpleConfig.getWindowFunction().newInstance())
触发器代码:


public class CountAndProcessingTimeTrigger extends Trigger {
private static final long serialVersionUID = 1L;
//窗口最大个数
private final long maxCount;
private final ReducingStateDescriptor stateDesc;
public CountAndProcessingTimeTrigger(long maxCount) {
this.stateDesc = new ReducingStateDescriptor<>("count_time", new 
CountAndProcessingTimeTrigger.Sum(),
LongSerializer.INSTANCE);
this.maxCount = maxCount;
}
/**
 * 元素添加
 *
 * @param o 元素
 * @param timestamp timestamp
 * @param window window
 * @param triggerContext triggerContext
 * @return TriggerResult
 * CONTINUE:表示啥都不做。
 * FIRE:表示触发计算,同时保留窗口中的数据
 * PURGE:简单地删除窗口的内容,并保留关于窗口和任何触发器状态的任何潜在元信息。
 * FIRE_AND_PURGE:触发计算,然后清除窗口中的元素。
 * @throws Exception Exception
 */
@Override
public TriggerResult onElement(Object o, long timestamp, TimeWindow window, 
TriggerContext triggerContext)
throws Exception {
triggerContext.registerProcessingTimeTimer(window.maxTimestamp());
ReducingState countState = 
triggerContext.getPartitionedState(stateDesc);
countState.add(1L);
if (countState.get() >= maxCount) {
log.info("countTrigger: {}", countState.get());
countState.clear();
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
/**
 * 窗口关闭
 *
 * @param timestamp timestamp
 * @param window window
 * @param triggerContext triggerContext
 * @return TriggerResult
 * @throws Exception Exception
 */
@Override
public TriggerResult onProcessingTime(long timestamp, TimeWindow window, 
TriggerContext triggerContext)
throws Exception {
ReducingState countState = 
triggerContext.getPartitionedState(stateDesc);
log.info("timeTrigger: {}, currentProcessingTime:{}", countState.get(),
window.maxTimestamp());
countState.clear();
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long timestamp, TimeWindow window, 
TriggerContext triggerContext)
throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public boolean canMerge() {
return false;
}
@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) {
ctx.mergePartitionedState(stateDesc);
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
ctx.registerProcessingTimeTimer(windowMaxTimestamp);
}
}
/**
 * 窗口删除
 *
 * @param window window
 * @param triggerContext triggerContext
 * @throws Exception Exception
 */
@Override
public void clear(TimeWindow window, TriggerContext triggerContext) throws 
Exception {
triggerContext.deleteProcessingTimeTimer(window.maxTimestamp());
triggerContext.getPartitionedState(stateDesc).clear();
}
/**
 * 计数方法
 */
private static class Sum implements ReduceFunction {
private static final long serialVersionUID = 1L;
private Sum() {
}
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}


| |
吴先生
|
|
15951914...@163.com
|