Sure!

(Aside, it turns out that the issue was using an `Array[Byte]` as a key - byte 
arrays don’t appear to have a stable hashCode. I’ll provide the skeleton for 
fullness, though.)

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(Config.callAggregator.parallelism)

env.addSource(kafkaSource)
  .flatMap(transformToRecords(_))
  .keyBy(b => new String(b.rowKey)) // rowKey is Array[Byte]
  .map(new StatefulAggregator())
  .addSink(hbaseSink)


Again, wrapping my keyBy function in `new String()` has fixed my issue. Thanks!

-a



> On Dec 7, 2016, at 11:28 AM, Stefan Richter <s.rich...@data-artisans.com> 
> wrote:
> 
> Hi,
> 
> could you maybe provide the (minimal) code for the problematic job? Also, are 
> you sure that the keyBy is working on the correct key attribute?
> 
> Best,
> Stefan
> 
>> Am 07.12.2016 um 15:57 schrieb Andrew Roberts <arobe...@fuze.com>:
>> 
>> Hello,
>> 
>> I’m trying to perform a stateful mapping of some objects coming in from 
>> Kafka in a parallelized flink job (set on the job using 
>> env.setParallelism(3)). The data source is a kafka topic, but the partitions 
>> aren’t meaningfully keyed for this operation (each kafka message is 
>> flatMapped to between 0-2 objects, with potentially different keys). I have 
>> a keyBy() operator directly before my map(), but I’m seeing objects with the 
>> same key distributed to different parallel task instances, as reported by 
>> getRuntimeContext().getIndexOfThisSubtask().
>> 
>> My understanding of keyBy is that it would segment the stream by key, and 
>> guarantee that all data with a given key would hit the same instance. Am I 
>> possibly seeing residual “keying” from the kafka topic?
>> 
>> I’m running flink 1.1.3 in scala. Please let me know if I can add more info.
>> 
>> Thanks,
>> 
>> Andrew
> 

Reply via email to