Re: flink-savepoint问题

2021-03-03 文章 guomuhua
我也遇到同样问题,为了打散数据,在keyby时加了随机数作为后缀,去掉随机数,可以正常savepoint,加上随机数就savepoint失败。所以如果确有要打散数据的需求,应该怎么处理呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-savepoint问题

2021-03-03 文章 guomuhua
我也遇到类似情况,为了打散数据,keyby加了随机数。请问怎么正确打散数据呢?
nobleyd wrote
> 是不是使用了随机key。

> guaishushu1103@

>  <

> guaishushu1103@

> > 于2021年3月3日周三 下午6:53写道:> checkpoint 可以成功保存,但是savepoint出现错误:>
> java.lang.Exception: Could not materialize checkpoint 2404 for operator>
> KeyedProcess (21/48).> at>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)>
> 
> at>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)>
> 
> at>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)>
> 
> at>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)>
> 
> at java.lang.Thread.run(Thread.java:745)> Caused by:
> java.util.concurrent.ExecutionException:>
> java.lang.IllegalArgumentException: Key group 0 is not in>
> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at
> java.util.concurrent.FutureTask.report(FutureTask.java:122)> at
> java.util.concurrent.FutureTask.get(FutureTask.java:192)> at>
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)>
> 
> at> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.

> (OperatorSnapshotFinalizer.java:47)> at>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)>
> 
> ... 3 more> Caused by: java.lang.IllegalArgumentException: Key group 0 is
> not in> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at>
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)>
> 
> at>
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)>
> 
> at>
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)>
> 
> at>
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)>
> 
> at>
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)>
> 
> at>
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)>
> 
> at>
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)>
> 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)> at>
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)>
> 
> ... 5 more>>> 

> guaishushu1103@

>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/

interval join 如何用 process time

2021-03-17 文章 guomuhua
在 flink sql 中,可以使用 proc time 来进行 interval join,但是在 stream api 中,只能用 event
time 进行 interval join,如何能使用 process time 呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: interval join 如何用 process time

2021-03-17 文章 guomuhua
你说的这个语法是flink sql 吧,我想问的是stream api 里面怎么用



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink sql count distonct 优化

2021-03-23 文章 guomuhua
在SQL中,如果开启了 local-global 参数:set table.optimizer.agg-phase-strategy=TWO_PHASE;
或者开启了Partial-Final 参数:set table.optimizer.distinct-agg.split.enabled=true;
 set
table.optimizer.distinct-agg.split.bucket-num=1024;
还需要对应的将SQL改写为两段式吗?
例如:
原SQL:
SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,

对所需DISTINCT字段buy_id模1024自动打散后,SQL:
SELECT day, SUM(cnt) total
FROM (
SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
FROM T GROUP BY day, MOD(buy_id, 1024))
GROUP BY day

还是flink会帮我自动改写SQL,我不用关心?

另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
 





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql count distonct 优化

2021-03-25 文章 guomuhua
Jark wrote
> 我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window
> agg支持这个参数了。可以期待下。
> 
> Best,
> Jark
> 
> On Wed, 24 Mar 2021 at 19:29, Robin Zhang <

> vincent2015qdlg@

> >
> wrote:
> 
>> Hi,guomuhua
>>   开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。
>>
>> Best,
>> Robin
>>
>>
>> guomuhua wrote
>> > 在SQL中,如果开启了 local-global 参数:set
>> > table.optimizer.agg-phase-strategy=TWO_PHASE;
>> > 或者开启了Partial-Final 参数:set
>> table.optimizer.distinct-agg.split.enabled=true;
>> >  set
>> > table.optimizer.distinct-agg.split.bucket-num=1024;
>> > 还需要对应的将SQL改写为两段式吗?
>> > 例如:
>> > 原SQL:
>> > SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
>> >
>> > 对所需DISTINCT字段buy_id模1024自动打散后,SQL:
>> > SELECT day, SUM(cnt) total
>> > FROM (
>> > SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
>> > FROM T GROUP BY day, MOD(buy_id, 1024))
>> > GROUP BY day
>> >
>> > 还是flink会帮我自动改写SQL,我不用关心?
>> >
>> > 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
>> > <
>> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png>;
>>
>> >
>> >
>> >
>> >
>> >
>> > --
>> > Sent from: http://apache-flink.147419.n8.nabble.com/
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>

感谢,如果不是window agg,开启参数后flink会自动打散是吧。那关于window agg,
不能自动打散,这部分的介绍,在文档中可以找到吗?具体在哪里呢?还是需要从源码里找呢?望指教。再次感谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/