I commented on the issue with a way that should work. On Fri, 9 Dec 2016 at 01:00 Chesnay Schepler <ches...@apache.org> wrote:
> Done. https://issues.apache.org/jira/browse/FLINK-5299 > > On 08.12.2016 16:50, Ufuk Celebi wrote: > > Would you like to open an issue for this for starters Chesnay? Would be > good to fix for the upcoming release even. > > > > > > On 8 December 2016 at 16:39:58, Chesnay Schepler (ches...@apache.org) > wrote: > >> It would be neat if we could support arrays as keys directly; it should > >> boil down to checking the key type and in case of an array injecting a > >> KeySelector that calls Arrays.hashCode(array). > >> This worked for me when i ran into the same issue while experimenting > >> with some stuff. > >> > >> The batch API can use arrays as keys as well, so it's also a matter of > >> consistency imo. > >> > >> Regards, > >> Chesnay > >> > >> On 08.12.2016 16:23, Ufuk Celebi wrote: > >>> @Aljoscha: I remember that someone else ran into this, too. Should we > address arrays > >> as keys specifically in the API? Prohibit? Document this? > >>> – Ufuk > >>> > >>> On 7 December 2016 at 17:41:40, Andrew Roberts (arobe...@fuze.com) > wrote: > >>>> Sure! > >>>> > >>>> (Aside, it turns out that the issue was using an `Array[Byte]` as a > key - byte arrays > >> don’t > >>>> appear to have a stable hashCode. I’ll provide the skeleton for > fullness, though.) > >>>> > >>>> val env = StreamExecutionEnvironment.getExecutionEnvironment > >>>> env.setParallelism(Config.callAggregator.parallelism) > >>>> > >>>> env.addSource(kafkaSource) > >>>> .flatMap(transformToRecords(_)) > >>>> .keyBy(b => new String(b.rowKey)) // rowKey is Array[Byte] > >>>> .map(new StatefulAggregator()) > >>>> .addSink(hbaseSink) > >>>> > >>>> > >>>> Again, wrapping my keyBy function in `new String()` has fixed my > issue. Thanks! > >>>> > >>>> -a > >>>> > >>>> > >>>> > >>>>> On Dec 7, 2016, at 11:28 AM, Stefan Richter wrote: > >>>>> > >>>>> Hi, > >>>>> > >>>>> could you maybe provide the (minimal) code for the problematic job? > Also, are you > >> sure > >>>> that the keyBy is working on the correct key attribute? > >>>>> Best, > >>>>> Stefan > >>>>> > >>>>>> Am 07.12.2016 um 15:57 schrieb Andrew Roberts : > >>>>>> > >>>>>> Hello, > >>>>>> > >>>>>> I’m trying to perform a stateful mapping of some objects coming in > from Kafka in a > >> parallelized > >>>> flink job (set on the job using env.setParallelism(3)). The data > source is a kafka > >> topic, > >>>> but the partitions aren’t meaningfully keyed for this operation (each > kafka message > >>>> is flatMapped to between 0-2 objects, with potentially different > keys). I have a keyBy() > >>>> operator directly before my map(), but I’m seeing objects with the > same key distributed > >>>> to different parallel task instances, as reported by > getRuntimeContext().getIndexOfThisSubtask(). > >>>>>> My understanding of keyBy is that it would segment the stream by > key, and guarantee > >>>> that all data with a given key would hit the same instance. Am I > possibly seeing residual > >>>> “keying” from the kafka topic? > >>>>>> I’m running flink 1.1.3 in scala. Please let me know if I can add > more info. > >>>>>> > >>>>>> Thanks, > >>>>>> > >>>>>> Andrew > >>>> > >> > >> > > > >