Hi Arnaud, Maybe I don't fully understand the constraints, but what about stream.map(new GetKuduPartitionMapper).keyBy(0).addSink(KuduSink());
The map(new GetKuduPartitionMapper) will be a regular RichMapFunction with open() and close() where you can handle the connection with Kudu's partitioning service. The map will output a Tuple2<PartitionId, Data> (or something nicer :) ), then Flink shuffles your data correctly, and the sinks will process the data correctly partitioned. I hope that this is what you were looking for! Best, Robert On Thu, May 28, 2020 at 6:21 PM LINZ, Arnaud <al...@bouyguestelecom.fr> wrote: > Hello, > > > > I would like to upgrade the performance of my Apache Kudu Sink by using > the new “KuduPartitioner” of Kudu API to match Flink stream partitions with > Kudu partitions to lower the network shuffling. > > For that, I would like to implement something like > > stream.partitionCustom(new KuduFlinkPartitioner<>(…)).addSink(new > KuduSink(…))); > > With KuduFLinkPartitioner a implementation of > org.apache.flink.api.common.functions.Partitioner > that internally make use of the KuduPartitioner client tool of Kudu’s API. > > > > However for that KuduPartioner to work, it needs to open – and close at > the end – a connection to the Kudu table – obviously something that can’t > be done for each line. But there is no “AbstractRichPartitioner” with > open() and close() method that I can use for that (the way I use it in the > sink for instance). > > > > What is the best way to implement this ? > > I thought of ThreadLocals that would be initialized during the first call > to *int* partition(K key, *int* numPartitions); but I won’t be able to > close() things nicely as I won’t be notified on job termination. > > > > I thought of putting those static ThreadLocals inside a “Identity Mapper” > that would be called just prior the partition with something like : > > stream.map(richIdentiyConnectionManagerMapper).partitionCustom(new > KuduFlinkPartitioner<>(…)).addSink(new KuduSink(…))); > > with kudu connections initialized in the mapper open(), closed in the > mapper close(), and used in the partitioner partition(). > > However It looks like an ugly hack breaking every coding principle, but as > long as the threads are reused between the mapper and the partitioner I > think that it should work. > > > > Is there a better way to do this ? > > > > Best regards, > > Arnaud > > > > > > ------------------------------ > > L'intégrité de ce message n'étant pas assurée sur internet, la société > expéditrice ne peut être tenue responsable de son contenu ni de ses pièces > jointes. Toute utilisation ou diffusion non autorisée est interdite. Si > vous n'êtes pas destinataire de ce message, merci de le détruire et > d'avertir l'expéditeur. > > The integrity of this message cannot be guaranteed on the Internet. The > company that sent this message cannot therefore be held liable for its > content nor attachments. Any unauthorized use or dissemination is > prohibited. If you are not the intended recipient of this message, then > please delete it and notify the sender. >