Sure! (Aside, it turns out that the issue was using an `Array[Byte]` as a key - byte arrays don’t appear to have a stable hashCode. I’ll provide the skeleton for fullness, though.)
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(Config.callAggregator.parallelism) env.addSource(kafkaSource) .flatMap(transformToRecords(_)) .keyBy(b => new String(b.rowKey)) // rowKey is Array[Byte] .map(new StatefulAggregator()) .addSink(hbaseSink) Again, wrapping my keyBy function in `new String()` has fixed my issue. Thanks! -a > On Dec 7, 2016, at 11:28 AM, Stefan Richter <s.rich...@data-artisans.com> > wrote: > > Hi, > > could you maybe provide the (minimal) code for the problematic job? Also, are > you sure that the keyBy is working on the correct key attribute? > > Best, > Stefan > >> Am 07.12.2016 um 15:57 schrieb Andrew Roberts <arobe...@fuze.com>: >> >> Hello, >> >> I’m trying to perform a stateful mapping of some objects coming in from >> Kafka in a parallelized flink job (set on the job using >> env.setParallelism(3)). The data source is a kafka topic, but the partitions >> aren’t meaningfully keyed for this operation (each kafka message is >> flatMapped to between 0-2 objects, with potentially different keys). I have >> a keyBy() operator directly before my map(), but I’m seeing objects with the >> same key distributed to different parallel task instances, as reported by >> getRuntimeContext().getIndexOfThisSubtask(). >> >> My understanding of keyBy is that it would segment the stream by key, and >> guarantee that all data with a given key would hit the same instance. Am I >> possibly seeing residual “keying” from the kafka topic? >> >> I’m running flink 1.1.3 in scala. Please let me know if I can add more info. >> >> Thanks, >> >> Andrew >