你发的代码就是原因,keySelector对于同一个输入数据,输出结果必须一致,不能不同。
如果需要实现类似效果,可以先使用 flatMap 生成 random key,然后将 key 存储到一个字段,比如就叫做 key,然后
keyBy("key") 这样可以。

junjie.m...@goupwith.com <junjie.m...@goupwith.com> 于2022年9月9日周五 17:59写道:
>
>
> Integer[] rebalanceKeys = createRebalanceKeys(parallelism);
> int rebalanceKeyIndex = new Random().nextInt(parallelism);
> Integer key = rebalanceKeys[rebalanceKeyIndex];
>
>  /**
>      * 构建均衡 KEY 数组
>      *
>      * @param parallelism 并行度
>      * @return
>      */
>     public static Integer[] createRebalanceKeys(int parallelism) {
>         HashMap<Integer, LinkedHashSet<Integer>> groupRanges = new 
> HashMap<>();
>         int maxParallelism = 
> KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism);
>         // 构造多个 key 用于生成足够的 groupRanges
>         int maxRandomKey = parallelism * 10;
>         for (int randomKey = 0; randomKey < maxRandomKey; randomKey++) {
>             int subtaskIndex = 
> KeyGroupRangeAssignment.assignKeyToParallelOperator(randomKey, 
> maxParallelism, parallelism);
>             LinkedHashSet<Integer> randomKeys = 
> groupRanges.computeIfAbsent(subtaskIndex, k -> new LinkedHashSet<>());
>             randomKeys.add(randomKey);
>         }
>
>         Integer[] result = new Integer[parallelism];
>         for (int i = 0; i < parallelism; i++) {
>             LinkedHashSet<Integer> ranges = groupRanges.get(i);
>             if (ranges == null || ranges.isEmpty()) {
>                 throw new RuntimeException("create rebalance keys error");
>             }
>             result[i] = ranges.stream().findFirst().get();
>         }
>         return result;
>     }
>
>
> 发件人: junjie.m...@goupwith.com
> 发送时间: 2022-09-09 17:52
> 收件人: user-zh
> 主题: Re: Re: Key group is not in KeyGroupRange
> key selector中使用random.nextInt(parallelism) 有时会报错
>
> From: yue ma
> Date: 2022-09-09 17:41
> To: user-zh
> Subject: Re: Key group is not in KeyGroupRange
> 你好,可以看一下使用的 key selector 是否稳定,key 是否会变化。
> junjie.m...@goupwith.com <junjie.m...@goupwith.com> 于2022年9月9日周五 17:35写道:
> > hi:
> > 本人遇到了这个报错:
> > Key group 51 is not in KeyGroupRange{startKeyGroup=64, endKeyGroup=127}.
> > Unless you're directly using low level state access APIs, this is most
> > likely caused by non-deterministic shuffle key (hashCode and equals
> > implementation).
> >
> > 这个报错出现的莫名其妙,想了解这个错误是在什么样的情况下才会导致这个报错的?
> > 谢谢!!
> >
> >

回复