[ https://issues.apache.org/jira/browse/FLINK-7424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Aljoscha Krettek closed FLINK-7424. ----------------------------------- Resolution: Not A Problem This is working as intended, but badly documented. Please reopen if you disagree. > `CEP` component make `KeyedStream` choose wrong channel > ------------------------------------------------------- > > Key: FLINK-7424 > URL: https://issues.apache.org/jira/browse/FLINK-7424 > Project: Flink > Issue Type: Bug > Components: CEP, Streaming > Reporter: Benedict Jin > Assignee: Benedict Jin > > `CEP` component make `KeyedStream` choose wrong channel > Origin KeySelector is perfect right. > {code:java} > public static KeySelector<HBaseServerLog, Integer> buildKeySelector() { > return (KeySelector<HBaseServerLog, Integer>) log -> { > if (log == null) return 0; > Integer flumeId; > if ((flumeId = log.getFlumeId()) == null) return 1; > return flumeId; > }; > } > {code} > After some changes, it will throw Key group index out of range of key group > range [16, 32) exception. > {code} > public static KeySelector<HBaseServerLog, Integer> buildKeySelector(final int > parallelism) { > return new KeySelector<HBaseServerLog, Integer>() { > private Random r = new Random(System.nanoTime()); > @Override > public Integer getKey(HBaseServerLog log) throws Exception { > if (log == null) return 0; > Integer flumeId; > if ((flumeId = log.getFlumeId()) == null) return 1; > return Math.max(flumeId + (r.nextBoolean() ? 0 : -1 * > r.nextInt(parallelism)), 0); > } > }; > } > {code} > But, after MathUtils.murmurHash(keyHash) % maxParallelism process, it > shouldn't be wrong. Actually, when we add some `CEP` component > (IterativeCondition/PatternFlatSelectFunction) code after it. It make the > KeySelector choose wrong channel and throw IllegalArgumentException. -- This message was sent by Atlassian JIRA (v6.4.14#64029)