Hi Dylan,
I have reproduced your issue based on your code,
Currently Flink does not support such nested correlate pattern query.
I have created a issue to track this [1].
Thanks for your reporting and help.
[1] https://issues.apache.org/jira/browse/FLINK-20255
Best,
Godfrey
Dylan Forciea
大家好:
我现在在使用flink中Catalog的查询分区的方法:Catalog#listPartitions(ObjectPath,
CatalogPartitionSpec)的时候遇到一个问题。
我发现CatalogPartitionSpec中存放分区信息的字partitionSpec类型是Map类型是否会更加合理和通用呢?
谢谢
I still haven't fully understood. Do you mean you can't infer the timestamp
in source A because it depends on some internal field of source B?
How is that actually working in a parallel setting? Which timestamp is used
in the different instances of a source?
Say, we have task A1 which is the
Hi Flavio,
if it arrives in the java process then you are doing everything right
already (or almost).
Are you shading the mysql connector? I'm suspecting that the property also
get shaded then. You could decompile your jar to be sure. Have you verified
that this is working as intended without
大家好:
我现在在使用flink中Catalog的查询分区的方法:Catalog#listPartitions(ObjectPath,
CatalogPartitionSpec)的时候遇到一个问题。
我发现CatalogPartitionSpec中存放分区信息的字partitionSpec类型是Map
1.没有初始的全量数据可能是会有问题的
这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据?
我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。
2.先发的before 后发的after
3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition
中。由于是按照主键id hash的
在 2020-11-20 13:25:53,"Jark Wu" 写道:
>1. 没有初始的全量数据可能是会有问题的。
>
>3. 你的
1.没有初始的全量数据可能是会有问题的
这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据?
我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。
2.先发的before 后发的after
3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition
中。由于是按照主键id hash的。
在 2020-11-20 13:25:53,"Jark Wu" 写道:
>1. 没有初始的全量数据可能是会有问题的。
>
>3. 你的
1. 没有初始的全量数据可能是会有问题的。
3. 你的 format 再解析 update 时,时先发的 before 还是 after?
4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不?
On Fri, 20 Nov 2020 at 12:46, kandy.wang wrote:
>
>
>
>
>
>
> 1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
>
> 2. 没有开启
>
>
>
>
> 在 2020-11-20 11:49:44,"Jark Wu" 写道:
>
Hi sparklelj,
Global window 的是所有相同的 key 的元素会在一个 window里,它没有 window end,所以需要自己实现 custom
trigger 来触发 window 的计算[1]。
它属于 keyed window,并不是只能有一个 window 实例( windowAll 只有一个 window 实例)。
所以看下是不是用法有错误呢,你的 ‘ StreamToBatchWindow’ 类是继承了哪个接口的?
[1]
1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
2. 没有开启
在 2020-11-20 11:49:44,"Jark Wu" 写道:
>实现上应该没什么问题。
>
>1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
>2. 是否开启 mini-batch了?
>
>Best,
>Jark
>
>On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote:
>
>> hi Jark:
>>
>>
>> 打了一下log 看了一下聚合相关的几个字段: v_spu_id
1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
2. 没有开启
在 2020-11-20 11:49:44,"Jark Wu" 写道:
>实现上应该没什么问题。
>
>1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
>2. 是否开启 mini-batch了?
>
>Best,
>Jark
>
>On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote:
>
>> hi Jark:
>>
>>
>> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num
可以 grep 看下哪些 jar 包包含这 2 个类的?
在 2020-11-20 08:51:59,"m13162790856" 写道:
>HI:
> 偶现的 ,但是最近几次出现的概率还是比较大的, 之前怀疑classload , 我的jar包做了分离, 不会把任何包打进去,
> 所以包能确保每次启动都是一样,很奇怪这种情况
>
>
>在 2020年11月19日 17:14,hailongwang<18868816...@163.com> 写道:
>
>
>Hi, 这个如果是偶尔出现的,应该是跟 ClassLoad 加载有关。 如果
实现上应该没什么问题。
1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
2. 是否开启 mini-batch了?
Best,
Jark
On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote:
> hi Jark:
>
>
> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>
> 自定义的format逻辑和canal的类似,insert update delete
Hi George,
If you PVCs could be mounted ReadWriteMany[1], then I think Flink could be
deployed on these PVs.
However, for the high availability enabled, you still need a distributed
coordination system(ZooKeeper,
or the new introduced Kubernetes HA[2]) for the leader election/retrieval
and meta
hi Jark:
打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before
update_after,format逻辑是应该这么写的吧。
在 2020-11-19 23:13:19,"Jark Wu" 写道:
>你可以先直接 select * from hive.database.table;
Thanks!
Update: We've confirmed with a test copy of our data now that if we remove
all the null values from arrays everything works smoothly and as expected.
So this definitely appears to be the culprit.
On Thu, Nov 19, 2020 at 6:41 PM Jark Wu wrote:
> Thanks Rex! This is very helpful. Will
hi Jark:
打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before
update_after,format逻辑是应该这么写的吧。
在 2020-11-19 23:13:19,"Jark Wu" 写道:
>你可以先直接 select * from hive.database.table;
我看了现在的 flink 1.11 的 keyBy 的代码,是使用的KeySelector
key,但每次只能返回一个字段,不支持返回多个字段,也就说明了一次只能按一个字段去分组(PS: test.keyBy(t ->
t.f0)),如果我想按多个字段进行分组的话该怎么操作呢?
-邮件原件-
发件人: guanxianchun
发送时间: 2020年11月19日 20:53
收件人: user-zh@flink.apache.org
主题: Re: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题
I checked with the following json:
{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
Thanks for your help!
Now the timestamps already go with the items in streaming. My streaming
pipeline is like this:
source -> parser --shuffle--> join -> sink
Streaming A and streaming B go through this pipeline, I keep logs in
streaming A in memory cache (linkedHashmap) in join operator, then
I'm reading your response as rocksdb having to seek across the whole
dataset for the whole table, which we hope to avoid.
What are the rules for the unique key and unique join key inference? Maybe
we can reorganize our plan to allow it to infer unique keys more correctly.
Thanks
On Wed, Nov 18,
Thanks Rex! This is very helpful. Will check it out later.
On Fri, 20 Nov 2020 at 03:02, Rex Fenley wrote:
> Below is a highly redacted set of data that should represent the problem.
> As you can see, the "roles" field has "[null]" in it, a null value within
> the array. We also see in our DB
Hi Niklas,
We dropped the Flink ML lib in 1.9 and plan to replace it with a new
machine learning library for traditional machine learning algorithms. And
that library will be based on FLIP-39. The plan was pushed back a little
bit because we plan to deprecate DataSet API and but haven't got the
您好,我在看global window的时候有一些疑问,首先是global
window是全局唯一的,也就是所以的元素都会流到这一个window实例上,这样的话前面加keyBy还有什么意义呢?反正都要流到一个window里面,我测试的结果好像也确实是这样,我自定义了一个global
window,然后设置了process的并行度,但是window确实是只有一个
示例如下:
dataUnion.keyBy(0).window(new StreamToBatchWindow()).process(new
flink-1.11使用KeySelector
--
Sent from: http://apache-flink.147419.n8.nabble.com/
HI:
偶现的 ,但是最近几次出现的概率还是比较大的, 之前怀疑classload , 我的jar包做了分离, 不会把任何包打进去,
所以包能确保每次启动都是一样,很奇怪这种情况
在 2020年11月19日 17:14,hailongwang<18868816...@163.com> 写道:
Hi, 这个如果是偶尔出现的,应该是跟 ClassLoad 加载有关。 如果
`org.apache.kafka.common.serialization.ByteArrayDeserializer` 被 child classload
加载了, 而
@Arvid thanks will try that, The NFS server I am using should be able to
have TP. In my observation the Serde is taking most of the CPU.
@Yun Tang
Please find the logs also what are your thoughts? about Source Task Data
Gen is causing this aka pusing the checkpoint to JM instead of filesystem ?
Hi,
Thanks for reaching out!
First of all, I would like to point out that an interesting
alternative to the per-job cluster could be running your jobs in
application mode [1].
Given that you want to run arbitrary SQL queries, I do not think you
can "share" across queries the part of the job
the properties arrives to the task manager because I can see them in the
java process (using ps aux)..or donyoubmean some special line of code?
Il gio 19 nov 2020, 20:53 Arvid Heise ha scritto:
> Hi Flavio,
>
> you are right, all looks good.
>
> Can you please verify if the properties arrived
Hi there,
Can flink be deployed to PVCs backed by block storage? It seems the
only option is blob storage today.
Thanks,
George
Hi Flavio,
you are right, all looks good.
Can you please verify if the properties arrived at the task manager in the
remote debugger session? For example, you could check the JVisualVM
Overview tab.
On Thu, Nov 19, 2020 at 8:38 PM Flavio Pompermaier
wrote:
> At the moment I use a standalone
Hi Niklas,
indeed some efforts on the machine learning libraries are pushed back in
favor of getting proper PyTorch and Tensorflow support through PyFlink.
Native implementations in Flink have been done so far in the DataSet API,
which is going to deprecated in the next few releases in favor of
At the moment I use a standalone cluster, isn't using env.java.opts the
right way to do it?
Il gio 19 nov 2020, 20:11 Arvid Heise ha scritto:
> Hi Flavio,
>
> -D afaik passes only the system property to the entry point (client or
> jobmanager depending on setup), while you probably want to have
Hi Si-li,
slot sharing is indeed the way that Flink performs co-location. It's
actually enabled by default. It should work as expected if upstream and
downstream operators have the same parallelism.
In certain cases, two operators can be even chained into one task where no
serialization/network
Hi Si-li,
couldn't you also add the timestamp as a state to the source? So the time
would store the timestamp of the last emitted record.
It's nearly identical to your solution but would fit the recovery model of
Flink much better.
If you want to go further back to account for the records that
Glad to hear that you worked it out.
Indeed, the path has to be accessible by the worker nodes. A common
solution is also to put it on some DFS like HDFS and reference that. Then
you only need to update one file if the key changes.
On Thu, Nov 19, 2020 at 2:14 AM Fanbin Bu wrote:
> i have to
Hi Slim,
for your initial question concerning the size of _metadata. When Flink
writes the checkpoint, it assumes some kind of DFS. Pretty much all known
DFS implementations behave poorly for many small files. If you run a job
with 5 tasks and parallelism of 120, then you'd get 600 small
Hi Flavio,
-D afaik passes only the system property to the entry point (client or
jobmanager depending on setup), while you probably want to have it on the
task managers.
The specific options to pass it to the task managers depend on the way you
deploy. -yD for yarn for example. For docker or
Hi Jiazhi,
you can use a rich function and query all static data in open [1] as you'd
do it in Java if you want to load the data into main memory. If you want to
dynamically query the database (enriching a record), you should use Async
IO instead. [2]
Alternatively, you can also use the data
Below is a highly redacted set of data that should represent the problem.
As you can see, the "roles" field has "[null]" in it, a null value within
the array. We also see in our DB corresponding rows like the following.
id | roles
---+
16867433 | {NULL}
We have
Yeah there is no wildcard hostname it can be using.
Went ahead and started the implementation for the start up wrapper, but just
realized after generating the key-cert pair in the JM wrapper, we will need to
ping back to the client with the cert.
Another question I have is, currently we are
Ah yes, missed the kafka part and just saw the array part. FLINK-19771
definitely was solely in the postgres-specific code.
Dylan
From: Jark Wu
Date: Thursday, November 19, 2020 at 9:12 AM
To: Dylan Forciea
Cc: Danny Chan , Rex Fenley , Flink
ML
Subject: Re: Filter Null in Array in SQL
You're right..I removed my flink dir and I re-extracted it and now it
works. Unfortunately I didn't keep the old version to understand what
were the difference but the error was probably caused by the fact that
I had a previous version of the WordCount.jar (without the listener)
in the flink lib
你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
值的,以验证你的自定义 format 没有问题。
Best,
Jark
On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote:
> --mysql表
> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>`id` INT UNSIGNED AUTO_INCREMENT,
>`spu_id`
Many thanks for the Help!!
Simone
From: Aljoscha Krettek
Sent: 19 November 2020 11:46
To: user@flink.apache.org
Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()
On 17.11.20 17:37, Simone Cavallarin wrote:
> Hi,
>
> I have been working on the
Hi Dylan,
I think Rex encountered another issue, because he is using Kafka with
Debezium format.
Hi Rex,
If you can share the json data and the exception stack, that would be
helpful!
Besides, you can try to enable 'debezium-json.ignore-parse-errors' option
[1] to skip the dirty data.
Best,
--mysql表
CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
`id` INT UNSIGNED AUTO_INCREMENT,
`spu_id` BIGINT NOT NULL,
`leaving_price` DECIMAL(10, 5)
PRIMARY KEY ( `id` ),
unique key idx_spu_id (spu_id)
)ENGINE=InnoDB DEFAULT CHARSET=utf8
--flink表
CREATE
Do you mean that the array contains values that are null, or that the entire
array itself is null? If it’s the latter, I have an issue written, along with a
PR to fix it that has been pending review [1].
Regards,
Dylan Forciea
[1] https://issues.apache.org/jira/browse/FLINK-19771
From: Danny
Hi Iacovos,
As Matthias mentioned tasks' off-heap has nothing to do with the memory
segments. This memory component is reserved only for the user code.
The memory segments are managed by Flink and used for batch workloads, like
in memory joins etc.
They are part of managed memory
I also tried 1.11.0 and 1.11.2, both work for me.
On Thu, Nov 19, 2020 at 3:39 PM Aljoscha Krettek
wrote:
> Hmm, there was this issue:
> https://issues.apache.org/jira/browse/FLINK-17744 But it should be fixed
> in your version.
>
> On 19.11.20 12:58, Flavio Pompermaier wrote:
> > Which version
Hmm, there was this issue:
https://issues.apache.org/jira/browse/FLINK-17744 But it should be fixed
in your version.
On 19.11.20 12:58, Flavio Pompermaier wrote:
Which version are you using?
I used the exact same commands on Flink 1.11.0 and I didn't get the job
listener output..
Il gio 19
Hi Andrey,
Thank you for your response. I created
https://issues.apache.org/jira/browse/FLINK-20244.
Best Regards,
Thomas
From: Andrey Zagrebin
Sent: Thursday, November 19, 2020 8:41
To: Thomas Eckestad
Cc: user@flink.apache.org
Subject: Re: Strange behaviour
Hi Flink-Community,
I'm digging through the history of FlinkML and FLIP-39 [0]. What I understood
so far is that FlinkML has been removed in 1.9, because it got unmaintained.
I'm not really able to find out whether FLIP-39 and providing a replacement for
FlinkML is currently worked on. The
Which version are you using?
I used the exact same commands on Flink 1.11.0 and I didn't get the job
listener output..
Il gio 19 nov 2020, 12:53 Andrey Zagrebin ha scritto:
> Hi Flavio and Aljoscha,
>
> Sorry for the late heads up. I could not actually reproduce the reported
> problem with
Hi Aljoscha, in my main class, within the jar, I create the env and I call
env.execute(). The listener is not called if the job is ran by the CLI
client or FlinkRestClient, I don't see anything on the job manager or task
manager. To me this is a bug and you can verify it attaching a listener to
Hi Flavio and Aljoscha,
Sorry for the late heads up. I could not actually reproduce the reported
problem with 'flink run' and local standalone cluster on master.
I get the expected output with the suggested modification of WordCount
program:
$ bin/start-cluster.sh
$ rm -rf out; bin/flink run
Hi
Flink only have slotSharingGroup API on DataStream class, I can't find any
public API to achieve co-location constraints. Could anyone provide me an
example?
Another question is that if I use slotSharing group, Flink will schedule
two sub tasks to same slot is possible. I think such schedule
On 17.11.20 17:37, Simone Cavallarin wrote:
Hi,
I have been working on the suggestion that you gave me, thanks! The first part is to add to the
message the gap. 1)I receive the event, 2)I take that event and I map it using
StatefulsessionCalculator, that is where I put together "The
Thanks! It's good to see that it is helpful to you.
Best,
Aljoscha
On 18.11.20 18:11, Dongwon Kim wrote:
Hi Aljoscha,
Unfortunately, it's not that easy right now because normal Sinks that
rely on checkpointing to write out data, such as Kafka, don't work in
BATCH execution mode because we
JobListener.onJobExecuted() is only invoked in
ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none
of these is still in the call chain with that setup then the listener
will not be invoked.
Also, this would only happen on the client, not on the broker (in your
case) or
大佬们:
在 Flink 1.10.x 中的 keyBy 算子可以同时按多个字段分组,比如 map.keyBy(0,1),但在 1.11.x
版本中这种方式被弃用了,看了下源码好像不支持按多字段分组了?还是有别的其他形式?
如果我想按多个字段分组的话需要怎么操作?
请大佬指点!
Can you also share your problematic json string here ? So that we can
decide the specific error case cause.
Rex Fenley 于2020年11月19日周四 下午2:51写道:
> Hi,
>
> I recently discovered some of our data has NULL values arriving in an
> ARRAY column. This column is being consumed by Flink via the Kafka
>
Hi,
这个如果是偶尔出现的,应该是跟 ClassLoad 加载有关。
如果 `org.apache.kafka.common.serialization.ByteArrayDeserializer` 被 child
classload 加载了,
而 `org.apache.kafka.common.serialization.Deserializer` 被 parent classload
加载了,那么会有问题。
你的 jar 包里面打了 kakfa-connector 的包了吗,如果集群上有,可以provided 看下。
希望对你有帮助。
Best,
Hailong
I have a spring boot job server that act as a broker towards our
application and a Flink session cluster. To submit a job I use the
FlinkRestClient (that is also the one used in the CLI client when I use the
run action it if I'm not wrong). However both methods don't trigger the job
listener.
Il
@Flavio, when you're saying you're using the RestClusterClient, you are
not actually using that manually, right? You're just submitting your job
via "bin/flink run ...", right?
What's the exact invocation of "bin/flink run" that you're using?
On 19.11.20 09:29, Andrey Zagrebin wrote:
Hi
Hi Flavio,
I think I can reproduce what you are reporting (assuming you also pass
'--output' to 'flink run').
I am not sure why it behaves like this. I would suggest filing a Jira
ticket for this.
Best,
Andrey
On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier
wrote:
> is this a bug or is it
Hi,
One thing to clarify first:
I think the "Closing the Kafka producer with timeoutMillis =
9223372036854775807 ms" log doesn't necessarily mean that a producer was
closed due to timeout (Long.MAX_VALUE).
I guess that is just a Kafka log message that is logged when a Kafka
producer is closed
Hi, Fenley ~
You are right, parsing nulls of ARRAY field is not supported now, i have
logged an issue [1] and would fix it soon ~
[1] https://issues.apache.org/jira/browse/FLINK-20234
Rex Fenley 于2020年11月19日周四 下午2:51写道:
> Hi,
>
> I recently discovered some of our data has NULL values arriving
68 matches
Mail list logo