Hi Arnaud,

just to add up. The overhead of this additional map is negligible if you
enable object reuse [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html

On Tue, Jun 2, 2020 at 10:34 AM Robert Metzger <rmetz...@apache.org> wrote:

> I'm not 100% sure about this answer, that's why I'm CCing Aljoscha to
> correct me if needed:
>
> Partitioners are not regular operators (like a map or window), thus they
> are not included in the regular Task lifecycle methods (of open() / map()
> etc. / close(), with the proper error handling, task cancellation
> mechanisms etc.). The custom partition function is called somewhere close
> to the network stack.
> It would be quite a lot of effort (and added complexity to the codebase)
> to allow for rich partitioners. Given that custom partitioners are a rarely
> used feature, it would not be justified to spend a lot of time for this
> (there's also a good workaround available)
>
>
> On Fri, May 29, 2020 at 2:46 PM LINZ, Arnaud <al...@bouyguestelecom.fr>
> wrote:
>
>> Hello,
>>
>>
>>
>> Yes, that would definitely do the trick, with an extra mapper after keyBy
>> to remove the tuple so that it stays seamless. It’s less hacky that what I
>> was thinking of, thanks!
>>
>> However, is there any plan in a future release to have rich partitioners
>> ? That would avoid adding  overhead and “intermediate” technical info in
>> the stream payload.
>>
>> Best,
>>
>> Arnaud
>>
>>
>>
>> *De :* Robert Metzger <rmetz...@apache.org>
>> *Envoyé :* vendredi 29 mai 2020 13:10
>> *À :* LINZ, Arnaud <al...@bouyguestelecom.fr>
>> *Cc :* user <user@flink.apache.org>
>> *Objet :* Re: Best way to "emulate" a rich Partitioner with open() and
>> close() methods ?
>>
>>
>>
>> Hi Arnaud,
>>
>>
>>
>> Maybe I don't fully understand the constraints, but what about
>>
>> stream.map(new GetKuduPartitionMapper).keyBy(0).addSink(KuduSink());
>>
>>
>> The map(new GetKuduPartitionMapper) will be a regular RichMapFunction
>> with open() and close() where you can handle the connection with Kudu's
>> partitioning service.
>>
>> The map will output a Tuple2<PartitionId, Data> (or something nicer :) ),
>> then Flink shuffles your data correctly, and the sinks will process the
>> data correctly partitioned.
>>
>>
>>
>> I hope that this is what you were looking for!
>>
>>
>>
>> Best,
>>
>> Robert
>>
>>
>>
>> On Thu, May 28, 2020 at 6:21 PM LINZ, Arnaud <al...@bouyguestelecom.fr>
>> wrote:
>>
>> Hello,
>>
>>
>>
>> I would like to upgrade the performance of my Apache Kudu Sink by using
>> the new “KuduPartitioner” of Kudu API to match Flink stream partitions with
>> Kudu partitions to lower the network shuffling.
>>
>> For that, I would like to implement something like
>>
>> stream.partitionCustom(new KuduFlinkPartitioner<>(…)).addSink(new
>> KuduSink(…)));
>>
>> With KuduFLinkPartitioner a implementation of 
>> org.apache.flink.api.common.functions.Partitioner
>> that internally make use of the KuduPartitioner client tool of Kudu’s API.
>>
>>
>>
>> However for that KuduPartioner to work, it needs to open – and close at
>> the end – a connection to the Kudu table – obviously something that can’t
>> be done for each line. But there is no “AbstractRichPartitioner” with
>> open() and close() method that I can use for that (the way I use it in the
>> sink for instance).
>>
>>
>>
>> What is the best way to implement this ?
>>
>> I thought of ThreadLocals that would be initialized during the first call
>> to *int* partition(K key, *int* numPartitions);  but I won’t be able to
>> close() things nicely as I won’t be notified on job termination.
>>
>>
>>
>> I thought of putting those static ThreadLocals inside a “Identity Mapper”
>> that would be called just prior the partition with something like :
>>
>> stream.map(richIdentiyConnectionManagerMapper).partitionCustom(new
>> KuduFlinkPartitioner<>(…)).addSink(new KuduSink(…)));
>>
>> with kudu connections initialized in the mapper open(), closed in the
>> mapper close(), and used  in the partitioner partition().
>>
>> However It looks like an ugly hack breaking every coding principle, but
>> as long as the threads are reused between the mapper and the partitioner I
>> think that it should work.
>>
>>
>>
>> Is there a better way to do this ?
>>
>>
>>
>> Best regards,
>>
>> Arnaud
>>
>>
>>
>>
>>
>>
>> ------------------------------
>>
>>
>> L'intégrité de ce message n'étant pas assurée sur internet, la société
>> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
>> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
>> vous n'êtes pas destinataire de ce message, merci de le détruire et
>> d'avertir l'expéditeur.
>>
>> The integrity of this message cannot be guaranteed on the Internet. The
>> company that sent this message cannot therefore be held liable for its
>> content nor attachments. Any unauthorized use or dissemination is
>> prohibited. If you are not the intended recipient of this message, then
>> please delete it and notify the sender.
>>
>>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to