Done. https://issues.apache.org/jira/browse/FLINK-5299

On 08.12.2016 16:50, Ufuk Celebi wrote:
Would you like to open an issue for this for starters Chesnay? Would be good to 
fix for the upcoming release even.


On 8 December 2016 at 16:39:58, Chesnay Schepler (ches...@apache.org) wrote:
It would be neat if we could support arrays as keys directly; it should
boil down to checking the key type and in case of an array injecting a
KeySelector that calls Arrays.hashCode(array).
This worked for me when i ran into the same issue while experimenting
with some stuff.
The batch API can use arrays as keys as well, so it's also a matter of
consistency imo.
Regards,
Chesnay
On 08.12.2016 16:23, Ufuk Celebi wrote:
@Aljoscha: I remember that someone else ran into this, too. Should we address 
arrays
as keys specifically in the API? Prohibit? Document this?
– Ufuk

On 7 December 2016 at 17:41:40, Andrew Roberts (arobe...@fuze.com) wrote:
Sure!

(Aside, it turns out that the issue was using an `Array[Byte]` as a key - byte 
arrays
don’t
appear to have a stable hashCode. I’ll provide the skeleton for fullness, 
though.)

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(Config.callAggregator.parallelism)

env.addSource(kafkaSource)
.flatMap(transformToRecords(_))
.keyBy(b => new String(b.rowKey)) // rowKey is Array[Byte]
.map(new StatefulAggregator())
.addSink(hbaseSink)


Again, wrapping my keyBy function in `new String()` has fixed my issue. Thanks!

-a



On Dec 7, 2016, at 11:28 AM, Stefan Richter wrote:

Hi,

could you maybe provide the (minimal) code for the problematic job? Also, are 
you
sure
that the keyBy is working on the correct key attribute?
Best,
Stefan

Am 07.12.2016 um 15:57 schrieb Andrew Roberts :

Hello,

I’m trying to perform a stateful mapping of some objects coming in from Kafka 
in a
parallelized
flink job (set on the job using env.setParallelism(3)). The data source is a 
kafka
topic,
but the partitions aren’t meaningfully keyed for this operation (each kafka 
message
is flatMapped to between 0-2 objects, with potentially different keys). I have 
a keyBy()
operator directly before my map(), but I’m seeing objects with the same key 
distributed
to different parallel task instances, as reported by 
getRuntimeContext().getIndexOfThisSubtask().
My understanding of keyBy is that it would segment the stream by key, and 
guarantee
that all data with a given key would hit the same instance. Am I possibly 
seeing residual
“keying” from the kafka topic?
I’m running flink 1.1.3 in scala. Please let me know if I can add more info.

Thanks,

Andrew



Reply via email to