在 mysql 中创建表
CREATE TABLE `p_port_packet_loss_5m` (
`id` binary(16) NOT NULL,
`coltime` datetime NOT NULL,
...
在flink 中创建表
create table if not exists p_port_packet_loss_5m
(
id bytes,
coltime timestamp,
...)
WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://ip:port/mydatabase',
mvn clean install -T 4C -Pfast -DskipTests -Dcheckstyle.skip=true
-DnpmRegistryURL=https://registry.npm.taobao.org 可以用这个试试
--
Sent from: http://apache-flink.147419.n8.nabble.com/
还没有,你可以关注下这个issue[1]
祝好,
Leonard
[1] https://issues.apache.org/jira/browse/FLINK-21183
> 在 2021年2月1日,13:29,macdoor 写道:
>
> 当前的 1.13-snapshot 支持了吗?我可以试试吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
官网上没有,在github上https://github.com/ververica/flink-cdc-connectors
```
SourceFunction sourceFunction = MySQLSource.builder()
.hostname("localhost") .port(3306) .databaseList("inventory") // monitor all
tables under inventory database .username("flinkuser") .password("flinkpw")
.deserializer(new
我没发现官网没看到DataStream有cdc的connector貌似,你是怎么搞的呢。
javenjiangfsof 于2021年2月1日周一 下午1:40写道:
> DataStream API,像下面这样
> ```
> val list = ... //i use jdbc to get the init data
> val dimensionInitStream = env.fromCollection(list)
> val dimension =
>
当前的 1.13-snapshot 支持了吗?我可以试试吗?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
DataStream API,像下面这样
```
val list = ... //i use jdbc to get the init data
val dimensionInitStream = env.fromCollection(list)
val dimension =
dimensionStream.union(dimensionInitStream).broadcast(descriptor)
mainStream.connect(dimensionStream)
...
```
FlinkSQL ?
javenjiangfsof 于2021年2月1日周一 上午11:40写道:
> Hi 社区的各位
>
> 最近也是刚刚开始接触flink,现在是有这样的一个需求,主流是来自于kafka,需要关联的配置来自于mysql,由于配置会发生新增更新删除的操作,所以使用cdc
> +
> broadcaststate来做配置的关联,但是这会存在一个广播流的初始化问题,我看了网上和社区中其他关于broadcaststate这个问题的回复,大概有一下两种解决方案:
> 1.初始化通过jdbc获取,通过fromCollection处理后,union
>
Okay, 和我理解的一样,这个时间上是 event time, 基于event time的 interval join
需要定义watermark,目前hive表还不支持定义watermark,1.13应该会支持。
> 在 2021年2月1日,10:58,macdoor 写道:
>
> p1.time 是数据记录里的时间,也用这个时间做分区
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
很详尽了,非常感谢 @tison !
发件人: tison
发送时间: 2021-02-01 11:43
收件人: user-zh
主题: Re: Re: 水印的作用请教
对于 StreamingFileSink 可以查看这两份资料
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
Thanks Xintong for being the release manager and everyone who helped with
the release!
Cheers,
Zhu
Dian Fu 于2021年1月29日周五 下午5:56写道:
> Thanks Xintong for driving this release!
>
> Regards,
> Dian
>
> 在 2021年1月29日,下午5:24,Till Rohrmann 写道:
>
> Thanks Xintong for being our release manager. Well
Thanks Xintong for being the release manager and everyone who helped with
the release!
Cheers,
Zhu
Dian Fu 于2021年1月29日周五 下午5:56写道:
> Thanks Xintong for driving this release!
>
> Regards,
> Dian
>
> 在 2021年1月29日,下午5:24,Till Rohrmann 写道:
>
> Thanks Xintong for being our release manager. Well
对于 StreamingFileSink 可以查看这两份资料
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
https://stackoverflow.com/questions/54763160/is-it-possible-that-bucketing-sink-create-bucket-on-event-time
默认的,watermark 和 (EventTime) Timestamp 信息会带到 BucketAssigner 实现所需的
Hi 社区的各位
最近也是刚刚开始接触flink,现在是有这样的一个需求,主流是来自于kafka,需要关联的配置来自于mysql,由于配置会发生新增更新删除的操作,所以使用cdc
+
broadcaststate来做配置的关联,但是这会存在一个广播流的初始化问题,我看了网上和社区中其他关于broadcaststate这个问题的回复,大概有一下两种解决方案:
1.初始化通过jdbc获取,通过fromCollection处理后,union
Hi,
看你之前发的邮件,你现在是把kerberos相关的配置放在某一个flink-conf.yaml里,然后启动了一个local模式吧?
但是local模式的pyflink
shell是不会主动读取任何flink-conf.yaml的。需要配置环境变量FLINK_HOME,将相关配置写入$FLINK_HOME/conf/flink-conf.yaml里,并且只有在提交job时候(flink
run、remote模式或者yarn模式)才会去读取flink-conf.yaml里的内容。
如果执意要在local模式下尝试,可以通过以下代码:
from
StreamAPI使用的是StreamingFileSink,SQL就是FileSystem了
发件人: tison
发送时间: 2021-02-01 11:01
收件人: user-zh
主题: Re: Re: 水印的作用请教
请问你使用哪种 SinkConnector 写入 HDFS 呢?
Best,
tison.
amenhub 于2021年2月1日周一 上午10:58写道:
> >>>
>
p1.time 是数据记录里的时间,也用这个时间做分区
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Thanks for testing the pod template. I really hope to get more feedbacks
from your use case.
Best,
Yang
Emilien Kenler 于2021年2月1日周一 上午9:45写道:
> Hello,
>
> I think this would solve our problem.
> We are also looking at supporting affinity rules, and it would also cover
> it.
>
> I'm going to
请问你使用哪种 SinkConnector 写入 HDFS 呢?
Best,
tison.
amenhub 于2021年2月1日周一 上午10:58写道:
> >>>
> 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区?
> 另外还是比较想了解,当指定水印及延迟的时候,比水印晚的数据,在流写HDFS的时候会怎么处理?
>
>
>
>
>
>>> 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区?
另外还是比较想了解,当指定水印及延迟的时候,比水印晚的数据,在流写HDFS的时候会怎么处理?
发件人: amenhub
发送时间: 2021-02-01 10:44
收件人: user-zh
主题: Re: Re: 水印的作用请教
谢谢回复!
嗯,flink 中 很多时间函数比如PROCTIME()/CURRENT_TIMESTAMP 返回的值都是
UTC+0的时间值,这里的timezone设置对这些函数不生效的,这些函数是有点时区问题的,
目前只能在代码里通过加减时区偏移绕过。
> 在 2021年2月1日,10:50,沉醉寒風 <1039601...@qq.com> 写道:
>
> 在代码中这样设置 streamTableEnv.getConfig().setLocalTimeZone(ZoneId.of("+8"))
> 也不管用. 还是要自己手动去加减时间才能做到,方法比较笨,
>
>
>
>
>
streamTableEnv.getConfig().setLocalTimeZone(ZoneId.of("+8"))
. ,??,
----
??:
Hi,
时区不生效在你的代码中是体现在那些地方呀?目前flink sql是有些时区问题,社区也希望在1.13能解决掉。
> 在 2021年2月1日,10:42,沉醉寒風 <1039601...@qq.com> 写道:
>
> streamTableEnv.getConfig().setLocalTimeZone(ZoneId.of("+8"))
Hi,macdoor
很有意思的case,p1.time字段是你记录里的时间吗? 你hive表的分区字段和这个时间字段的关系是怎么样的呀?
> 在 2021年1月30日,17:54,macdoor 写道:
>
> 具体需求是这样,采集取得的通道总流量5分钟一次存入 hive 表,为了取得 5 分钟内该通道的流量,需要前后2次采集到的总流量相减,我想用同一个 hive
> 表自己相互 join,形成 2 个 hive 流 join,不知道是否可以实现?或者有其他实现方法吗?
> 我现在使用 crontab 定时 batch 模式做,希望能改成 stream 模式
>
>
谢谢回复!
也就是说如果我利用Flink从Kafka (Select
*)采集数据到HDFS,不涉及Timer触发逻辑,使用水印的目的就只是为了使用事件时间以及HDFS中基于事件时间进行分区目录创建,对吗?
best,
amenhub
发件人: tison
发送时间: 2021-02-01 10:36
收件人: user-zh
主题: Re: 水印的作用请教
取决于你的计算流图,watermark 通常只在以下情况有实际作用
True
& cond 1. 使用 EventTime
& cond 2. 流图中有 Timer 触发逻辑,例如 Window 算子或自定义
streamTableEnv.getConfig().setLocalTimeZone(ZoneId.of("+8"))
flink sql+8,??
取决于你的计算流图,watermark 通常只在以下情况有实际作用
True
& cond 1. 使用 EventTime
& cond 2. 流图中有 Timer 触发逻辑,例如 Window 算子或自定义 Timer
Best,
tison.
amenhub 于2021年2月1日周一 上午10:26写道:
> hi everyone,
>
> 最近在使用Stream API或Table API编写任务,接Kafka数据流写HDFS的时候,关于水印有几个问题想请教社区帮忙答疑。
>
>
> 在 2021年1月31日,20:15,Appleyuchi 写道:
>
> 一点小小的建议哈,
> 目前flink社区讨论主要还是java/scala为主,
> 如果执意使用pyflink的话,后续极有可能会遇到较大的阻力.
我理解这种较大阻力应该不存在的,社区里pyflink的投入还挺大的,蛮多开发者的,我也cc两位在这块做贡献的社区开发者,从JIRA上看pyflink相关的开发进度都挺快的。
如果有机器学习,python相关的经验,用pyflink我觉得挺合适的。
祝好,
Leonard
hi everyone,
最近在使用Stream API或Table API编写任务,接Kafka数据流写HDFS的时候,关于水印有几个问题想请教社区帮忙答疑。
在我的理解中,水印本意是容忍事件时间的延迟(乱序程序),在不能容忍的时候触发窗口计算,以达到输出该窗口结果的目的。
那么,
1.在Kafka入HDFS的过程中,水印的作用具体是什么呢?貌似无窗口计算?
Hi Colletta,
This error is kind of expected if the JobMaster / ResourceManager does not
maintain a stable connection to the ZooKeeper service, which may be caused
by network issues, GC pause, or unstable ZK service availability.
By "similar issue", what I meant is I'm not aware of any issue
K8s is self managed on ec2 nodes
After submitting the job and getting an exception I checked:
1.ssh into the machine and verify using the cli the pod has access.
2. In the job main method I instantiate a s3 client from the sdk (once with
default credential chain and once with access key and
Hello,
I think this would solve our problem.
We are also looking at supporting affinity rules, and it would also cover it.
I'm going to try to find some time this week to try your patch.
Thanks
From: Yang Wang
Sent: Friday, January 29, 2021 6:20 PM
To: Emilien
,
----
??:
"user-zh"
你好,图片在上传在附件里面了
hezongji...@qq.com
发件人: tison
发送时间: 2021-02-01 09:31
收件人: user-zh
主题: Re: 为什么每次打印的是上一条消息产生的watermark,不是最新的一条消息产生的watermark呢?并行度设置为1了
邮件列表不支持直接粘贴图片,请尝试使用附件或 gist 等方式共享。
Best,
tison.
hezongji...@qq.com 于2021年2月1日周一 上午9:28写道:
>
有时候这种job持续2个多小时,我只能cancel job,但无法正常 cancel,都会导致 taskmanager 挂掉,错误如下
2021-01-31 23:04:23,677 ERROR
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Task did
not exit gracefully within 180 + seconds.
org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully
within
邮件列表不支持直接粘贴图片,请尝试使用附件或 gist 等方式共享。
Best,
tison.
hezongji...@qq.com 于2021年2月1日周一 上午9:28写道:
> 为什么每次打印的是上一条消息产生的watermark,不是最新的一条消息产生的watermark呢?并行度设置为1了
> 代码如下:
>
> 运行结果如下:
> --
> hezongji...@qq.com
>
打开了 debug 级别的日志,有这样的错误
2021-01-31 20:45:30,364 DEBUG
org.apache.flink.runtime.io.network.partition.ResultPartitionManager [] -
Released partition dc8a2804b6df6b0ceaee2610ccf6c6e5#312 produced by
448c5ac36dcda818f56ec5bbd728da10.
2021-01-31 20:45:30,392 DEBUG
为什么每次打印的是上一条消息产生的watermark,不是最新的一条消息产生的watermark呢?并行度设置为1了
代码如下:
运行结果如下:
hezongji...@qq.com
Oh, nvm, that's the "Persisted" part which is documented as "*Persisted
in-flight data*: The number of bytes persisted during the alignment (time
between receiving the first and the last checkpoint barrier) over all
acknowledged subtasks. This is > 0 only if the unaligned checkpoints are
enabled."
Got it, thanks! What is the 0 B part of that?
On Sun, Jan 31, 2021 at 3:43 AM Arvid Heise wrote:
> Processed in-flight data is the size of data that is processed between the
> first and last checkpoint barrier in aligned checkpointing. [1]
>
> [1]
>
Hi Oran,
How is that k8s deployed? Are you sure all nodes have the same IAM role?
can you try and see if this is fixed by granting permissions to that bucket
to the IAM role in use?
On Sun, Jan 31, 2021 at 5:15 PM OranShuster wrote:
> I made some more tests and the issue is still not resolved
I made some more tests and the issue is still not resolved
Since the submitted job main method is executed before the execution graph
is submitted i added the aws sdk as an dependency and used it to upload
files to the bucket in the main method
Once with the default credentials provider, this
周期性batch mode 从 hive 提取数据插入 mysql,每批次 10K 到 20K 行数据,多数情况下
10-20秒可以完成,但不定期就会很长时间,能达到 20多分钟,但也能成功,查看了日志也看不到错误,检查 mysql 也没有发现锁表,怀疑 hive
metastore 的性能,但也没看出问题。
请教分析思路,从 flink 上能看出job 在等待什么吗?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
一点小小的建议哈,
目前flink社区讨论主要还是java/scala为主,
如果执意使用pyflink的话,后续极有可能会遇到较大的阻力.
在 2021-01-31 14:26:55,"瞿叶奇" <389243...@qq.com> 写道:
>您好,我是国网陕西采集系统开发人员,我们在架构改造中,准备使用pyflink
>解决实时Kafka数据写HDFS的问题,我的Kafka集群存在kerberos安全认证,导致我现在还没连接上,能不能给一个样例呢?
Processed in-flight data is the size of data that is processed between the
first and last checkpoint barrier in aligned checkpointing. [1]
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/monitoring/checkpoint_monitoring.html#history-tab
On Sun, Jan 31, 2021 at 7:45 AM Rex Fenley
46 matches
Mail list logo