Hi,
I see. My Key class is an abstract class, which subclasses are Key1<?>, 
Key2<?,?> etc, so it’s very like a tuple. It is heavily used in 
“non-distributed” hash maps once the dataset is reduced to fit on a single JVM.
It exposes the common contract that I need (such as getHeadKey(), getLastl(), 
or makeKey(Key,Object)) to “navigate” in the key space, and a cached hash code 
to make hash maps faster. My generic algorithms do not need to know how many 
fields are exposed in the Key, but they need to be able to construct another 
key from two keys.

Arnaud

De : ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] De la part de Stephan 
Ewen
Envoyé : vendredi 24 avril 2015 11:14
À : user@flink.apache.org
Objet : Re: How to make a generic key for groupBy

Hi Arnaud!

Thank you for the warm words! Let's find a good way to get this to work...

As a bit of background:
In Flink, the API needs to now a bit about the types that go through the 
functions, because Flink pre-generates and configures serializers, and 
validates that things fit together.

It is also important that keys are exposed rather specifically, because Flink 
internally tries to work on serialized data (that makes it in-memory operations 
predictable and robust).

If you expose a key as a "String", or "long" or "double", then Flink knows how 
to work on it in a binary fashion.
Also, if you expose a key as a POJO, then Flink interprets the key as a 
combination of the fields, and can again work on the serialized data.

If you only expose "Comparable" (which is the bare minimum for a key), you 
experience performance degradation (most notably for sorts), because every key 
operation involves serialization and deserialization.

So the goal would be to expose the key properly. We can always hint to the API 
what the key type is, precisely for the cases where the inference cannot do it.
  - To understand things a bit better: What is your "Key" type? Is it an 
abstract class, an interface, a generic parameter?


Greetings,
Stephan


FYI: In Scala, this works actually quite a bit easier, since Scala does 
preserve generic types. In Java, we built a lot of reflection tooling, but 
there are cases where it is impossible to infer the types via reflection, like 
yours.



On Thu, Apr 23, 2015 at 6:35 PM, Soumitra Kumar 
<kumar.soumi...@gmail.com<mailto:kumar.soumi...@gmail.com>> wrote:
Will you elaborate on your use case? It would help to find out where Flink 
shines. IMO, its a great project, but needs more differentiation from Spark.

On Thu, Apr 23, 2015 at 7:25 AM, LINZ, Arnaud 
<al...@bouyguestelecom.fr<mailto:al...@bouyguestelecom.fr>> wrote:
Hello,

After a quite successful benchmark yesterday (Flink being about twice faster 
than Spark on my use cases), I’ve turned instantly from spark-fan to flink-fan 
– great job, committers!
So I’ve decided to port my existing Spark tools to Flink. Happily, most of the 
difficulty was renaming classes, packages and variables with “spark” in them to 
something more neutral ☺

However there is one easy thing in Spark I’m still wondering how to do in Flink 
: generic keys.

I’m trying to make a framework on which my applications are built. That 
framework thus manipulate “generic types” representing the data, inheriting 
from an abstract class with a common contract, let’s call it “Bean”.

Among other things Bean exposes an abstract method
public Key getKey();

Key being one of my core types used in several java algorithms.

Let’s say I have the class :
public class Framework<T extends Bean> implements Serializable {

public DataSet<T> doCoolStuff(final DataSet<T> inputDataset) {
        // Group lines according to a key
        final UnsortedGrouping<YT> groupe = inputDataset.groupBy(new 
KeySelector<T, Key>() {
            @Override
            public Key getKey(T record)  {
                return record.getKey();
            }
        });
             (…)
       }
}

With Spark, a mapToPair works fine because all I have to do is implements 
correctly hashCode() and equals() on my Key type.
With Flink, Key is not recognized as a POJO object (well it is not) and that 
does not work.

I have tried to expose something like public Tuple getKeyAsTuple(); in Key but 
Flink does not accept generic Tuples. I’ve tried to parameterize my Tuple but 
Flink does not know how to infer
the generic type value.

So I’m wondering what is the best way to implement it.
For now I have exposed something like public String getKeyAsString(); and 
turned my generic treatment into :
final UnsortedGrouping<YT> groupe = inputDataset.groupBy(new KeySelector<T, 
String>() {
            @Override
            public String getKey(T record)  {
                return record.getKey().getKeyAsString();
            }
        });
But that “ASCII” representation is suboptimal.

I thought of passing a key to tuple conversion lambda upon creation of the 
Framework class but that would be boiler-plate code on the user’s end, which 
I’m not fond of.

So my questions are :

-          Is there a smarter way to do this ?

-          What kind of objects can be passed as a Key ? Is there an Interface 
to respect ?

-          In the worst case, is byte[]  ok as a Key ? (I can code the 
serialization on the framework side…)


Best regards,
Arnaud


________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.


Reply via email to