你发的代码就是原因,keySelector对于同一个输入数据,输出结果必须一致,不能不同。 如果需要实现类似效果,可以先使用 flatMap 生成 random key,然后将 key 存储到一个字段,比如就叫做 key,然后 keyBy("key") 这样可以。
junjie.m...@goupwith.com <junjie.m...@goupwith.com> 于2022年9月9日周五 17:59写道: > > > Integer[] rebalanceKeys = createRebalanceKeys(parallelism); > int rebalanceKeyIndex = new Random().nextInt(parallelism); > Integer key = rebalanceKeys[rebalanceKeyIndex]; > > /** > * 构建均衡 KEY 数组 > * > * @param parallelism 并行度 > * @return > */ > public static Integer[] createRebalanceKeys(int parallelism) { > HashMap<Integer, LinkedHashSet<Integer>> groupRanges = new > HashMap<>(); > int maxParallelism = > KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism); > // 构造多个 key 用于生成足够的 groupRanges > int maxRandomKey = parallelism * 10; > for (int randomKey = 0; randomKey < maxRandomKey; randomKey++) { > int subtaskIndex = > KeyGroupRangeAssignment.assignKeyToParallelOperator(randomKey, > maxParallelism, parallelism); > LinkedHashSet<Integer> randomKeys = > groupRanges.computeIfAbsent(subtaskIndex, k -> new LinkedHashSet<>()); > randomKeys.add(randomKey); > } > > Integer[] result = new Integer[parallelism]; > for (int i = 0; i < parallelism; i++) { > LinkedHashSet<Integer> ranges = groupRanges.get(i); > if (ranges == null || ranges.isEmpty()) { > throw new RuntimeException("create rebalance keys error"); > } > result[i] = ranges.stream().findFirst().get(); > } > return result; > } > > > 发件人: junjie.m...@goupwith.com > 发送时间: 2022-09-09 17:52 > 收件人: user-zh > 主题: Re: Re: Key group is not in KeyGroupRange > key selector中使用random.nextInt(parallelism) 有时会报错 > > From: yue ma > Date: 2022-09-09 17:41 > To: user-zh > Subject: Re: Key group is not in KeyGroupRange > 你好,可以看一下使用的 key selector 是否稳定,key 是否会变化。 > junjie.m...@goupwith.com <junjie.m...@goupwith.com> 于2022年9月9日周五 17:35写道: > > hi: > > 本人遇到了这个报错: > > Key group 51 is not in KeyGroupRange{startKeyGroup=64, endKeyGroup=127}. > > Unless you're directly using low level state access APIs, this is most > > likely caused by non-deterministic shuffle key (hashCode and equals > > implementation). > > > > 这个报错出现的莫名其妙,想了解这个错误是在什么样的情况下才会导致这个报错的? > > 谢谢!! > > > >