Hi Arnaud,

Maybe I don't fully understand the constraints, but what about
stream.map(new GetKuduPartitionMapper).keyBy(0).addSink(KuduSink());

The map(new GetKuduPartitionMapper) will be a regular RichMapFunction with
open() and close() where you can handle the connection with Kudu's
partitioning service.
The map will output a Tuple2<PartitionId, Data> (or something nicer :) ),
then Flink shuffles your data correctly, and the sinks will process the
data correctly partitioned.

I hope that this is what you were looking for!

Best,
Robert

On Thu, May 28, 2020 at 6:21 PM LINZ, Arnaud <al...@bouyguestelecom.fr>
wrote:

> Hello,
>
>
>
> I would like to upgrade the performance of my Apache Kudu Sink by using
> the new “KuduPartitioner” of Kudu API to match Flink stream partitions with
> Kudu partitions to lower the network shuffling.
>
> For that, I would like to implement something like
>
> stream.partitionCustom(new KuduFlinkPartitioner<>(…)).addSink(new
> KuduSink(…)));
>
> With KuduFLinkPartitioner a implementation of 
> org.apache.flink.api.common.functions.Partitioner
> that internally make use of the KuduPartitioner client tool of Kudu’s API.
>
>
>
> However for that KuduPartioner to work, it needs to open – and close at
> the end – a connection to the Kudu table – obviously something that can’t
> be done for each line. But there is no “AbstractRichPartitioner” with
> open() and close() method that I can use for that (the way I use it in the
> sink for instance).
>
>
>
> What is the best way to implement this ?
>
> I thought of ThreadLocals that would be initialized during the first call
> to *int* partition(K key, *int* numPartitions);  but I won’t be able to
> close() things nicely as I won’t be notified on job termination.
>
>
>
> I thought of putting those static ThreadLocals inside a “Identity Mapper”
> that would be called just prior the partition with something like :
>
> stream.map(richIdentiyConnectionManagerMapper).partitionCustom(new
> KuduFlinkPartitioner<>(…)).addSink(new KuduSink(…)));
>
> with kudu connections initialized in the mapper open(), closed in the
> mapper close(), and used  in the partitioner partition().
>
> However It looks like an ugly hack breaking every coding principle, but as
> long as the threads are reused between the mapper and the partitioner I
> think that it should work.
>
>
>
> Is there a better way to do this ?
>
>
>
> Best regards,
>
> Arnaud
>
>
>
>
>
> ------------------------------
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>

Reply via email to