Hello,
After a quite successful benchmark yesterday (Flink being about twice faster
than Spark on my use cases), I’ve turned instantly from spark-fan to flink-fan
– great job, committers!
So I’ve decided to port my existing Spark tools to Flink. Happily, most of the
difficulty was renaming classes, packages and variables with “spark” in them to
something more neutral ☺
However there is one easy thing in Spark I’m still wondering how to do in Flink
: generic keys.
I’m trying to make a framework on which my applications are built. That
framework thus manipulate “generic types” representing the data, inheriting
from an abstract class with a common contract, let’s call it “Bean”.
Among other things Bean exposes an abstract method
public Key getKey();
Key being one of my core types used in several java algorithms.
Let’s say I have the class :
public class Framework<T extends Bean> implements Serializable {
public DataSet<T> doCoolStuff(final DataSet<T> inputDataset) {
// Group lines according to a key
final UnsortedGrouping<YT> groupe = inputDataset.groupBy(new
KeySelector<T, Key>() {
@Override
public Key getKey(T record) {
return record.getKey();
}
});
(…)
}
}
With Spark, a mapToPair works fine because all I have to do is implements
correctly hashCode() and equals() on my Key type.
With Flink, Key is not recognized as a POJO object (well it is not) and that
does not work.
I have tried to expose something like public Tuple getKeyAsTuple(); in Key but
Flink does not accept generic Tuples. I’ve tried to parameterize my Tuple but
Flink does not know how to infer
the generic type value.
So I’m wondering what is the best way to implement it.
For now I have exposed something like public String getKeyAsString(); and
turned my generic treatment into :
final UnsortedGrouping<YT> groupe = inputDataset.groupBy(new KeySelector<T,
String>() {
@Override
public String getKey(T record) {
return record.getKey().getKeyAsString();
}
});
But that “ASCII” representation is suboptimal.
I thought of passing a key to tuple conversion lambda upon creation of the
Framework class but that would be boiler-plate code on the user’s end, which
I’m not fond of.
So my questions are :
- Is there a smarter way to do this ?
- What kind of objects can be passed as a Key ? Is there an Interface
to respect ?
- In the worst case, is byte[] ok as a Key ? (I can code the
serialization on the framework side…)
Best regards,
Arnaud
________________________________
L'intégrité de ce message n'étant pas assurée sur internet, la société
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir
l'expéditeur.
The integrity of this message cannot be guaranteed on the Internet. The company
that sent this message cannot therefore be held liable for its content nor
attachments. Any unauthorized use or dissemination is prohibited. If you are
not the intended recipient of this message, then please delete it and notify
the sender.