I'm not 100% sure about this answer, that's why I'm CCing Aljoscha to
correct me if needed:

Partitioners are not regular operators (like a map or window), thus they
are not included in the regular Task lifecycle methods (of open() / map()
etc. / close(), with the proper error handling, task cancellation
mechanisms etc.). The custom partition function is called somewhere close
to the network stack.
It would be quite a lot of effort (and added complexity to the codebase) to
allow for rich partitioners. Given that custom partitioners are a rarely
used feature, it would not be justified to spend a lot of time for this
(there's also a good workaround available)


On Fri, May 29, 2020 at 2:46 PM LINZ, Arnaud <al...@bouyguestelecom.fr>
wrote:

> Hello,
>
>
>
> Yes, that would definitely do the trick, with an extra mapper after keyBy
> to remove the tuple so that it stays seamless. It’s less hacky that what I
> was thinking of, thanks!
>
> However, is there any plan in a future release to have rich partitioners ?
> That would avoid adding  overhead and “intermediate” technical info in the
> stream payload.
>
> Best,
>
> Arnaud
>
>
>
> *De :* Robert Metzger <rmetz...@apache.org>
> *Envoyé :* vendredi 29 mai 2020 13:10
> *À :* LINZ, Arnaud <al...@bouyguestelecom.fr>
> *Cc :* user <user@flink.apache.org>
> *Objet :* Re: Best way to "emulate" a rich Partitioner with open() and
> close() methods ?
>
>
>
> Hi Arnaud,
>
>
>
> Maybe I don't fully understand the constraints, but what about
>
> stream.map(new GetKuduPartitionMapper).keyBy(0).addSink(KuduSink());
>
>
> The map(new GetKuduPartitionMapper) will be a regular RichMapFunction with
> open() and close() where you can handle the connection with Kudu's
> partitioning service.
>
> The map will output a Tuple2<PartitionId, Data> (or something nicer :) ),
> then Flink shuffles your data correctly, and the sinks will process the
> data correctly partitioned.
>
>
>
> I hope that this is what you were looking for!
>
>
>
> Best,
>
> Robert
>
>
>
> On Thu, May 28, 2020 at 6:21 PM LINZ, Arnaud <al...@bouyguestelecom.fr>
> wrote:
>
> Hello,
>
>
>
> I would like to upgrade the performance of my Apache Kudu Sink by using
> the new “KuduPartitioner” of Kudu API to match Flink stream partitions with
> Kudu partitions to lower the network shuffling.
>
> For that, I would like to implement something like
>
> stream.partitionCustom(new KuduFlinkPartitioner<>(…)).addSink(new
> KuduSink(…)));
>
> With KuduFLinkPartitioner a implementation of 
> org.apache.flink.api.common.functions.Partitioner
> that internally make use of the KuduPartitioner client tool of Kudu’s API.
>
>
>
> However for that KuduPartioner to work, it needs to open – and close at
> the end – a connection to the Kudu table – obviously something that can’t
> be done for each line. But there is no “AbstractRichPartitioner” with
> open() and close() method that I can use for that (the way I use it in the
> sink for instance).
>
>
>
> What is the best way to implement this ?
>
> I thought of ThreadLocals that would be initialized during the first call
> to *int* partition(K key, *int* numPartitions);  but I won’t be able to
> close() things nicely as I won’t be notified on job termination.
>
>
>
> I thought of putting those static ThreadLocals inside a “Identity Mapper”
> that would be called just prior the partition with something like :
>
> stream.map(richIdentiyConnectionManagerMapper).partitionCustom(new
> KuduFlinkPartitioner<>(…)).addSink(new KuduSink(…)));
>
> with kudu connections initialized in the mapper open(), closed in the
> mapper close(), and used  in the partitioner partition().
>
> However It looks like an ugly hack breaking every coding principle, but as
> long as the threads are reused between the mapper and the partitioner I
> think that it should work.
>
>
>
> Is there a better way to do this ?
>
>
>
> Best regards,
>
> Arnaud
>
>
>
>
>
>
> ------------------------------
>
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>
>

Reply via email to