[docs] Update docs on data types and serialization, to include type hints, type 
registration, and serializer registration.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5160568
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5160568
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5160568

Branch: refs/heads/master
Commit: b516056869a825db0913ff852c071de198f4d390
Parents: 5f67b54
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 20 23:10:44 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Sep 21 17:53:33 2016 +0200

----------------------------------------------------------------------
 docs/dev/types_serialization.md | 153 +++++++++++++++++++++++------------
 1 file changed, 102 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b5160568/docs/dev/types_serialization.md
----------------------------------------------------------------------
diff --git a/docs/dev/types_serialization.md b/docs/dev/types_serialization.md
index 8a32491..4b8e25f 100644
--- a/docs/dev/types_serialization.md
+++ b/docs/dev/types_serialization.md
@@ -1,5 +1,5 @@
 ---
-title: "Data Types"
+title: "Data Types & Serialization"
 nav-id: types
 nav-parent_id: dev
 nav-pos: 9
@@ -23,13 +23,8 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Flink handles types in a unique way, containing its own type descriptors,
-generic type extraction, and type serialization framework.
-This document describes the concepts and the rationale behind them.
-
-There are fundamental differences in the way that the Scala API and
-the Java API handle type information, so most of the issues described
-here relate only to one of the to APIs.
+Apache Flink handles data types and serialization in a unique way, containing 
its own type descriptors,
+generic type extraction, and type serialization framework. This document 
describes the concepts and the rationale behind them.
 
 * This will be replaced by the TOC
 {:toc}
@@ -37,21 +32,43 @@ here relate only to one of the to APIs.
 
 ## Type handling in Flink
 
-Flink tries to know as much information about what types enter and leave user 
functions as possible.
-This stands in contrast to the approach to just assuming nothing and letting 
the
-programming language and serialization framework handle all types dynamically.
+Flink tries to infer a lot of information about the data types that are 
exchanged and stored during the distributed computation.
+Think about it like a database that infers the schema of tables. In most 
cases, Flink infers all necessary information seamlessly
+by itself. Having the type information allows Flink to do some cool things:
+
+* Using POJOs types and grouping / joining / aggregating them by referring to 
field names (like `dataSet.keyBy("username")`).
+  The type information allows Flink to check (for typos and type 
compatibility) early rather than failing later ar runtime.
+
+* The more Flink knows about data types, the better the serialization and data 
layout schemes are.
+  That is quite important for the memory usage paradigm in Flink (work on 
serialized data inside/outside the heap where ever possible
+  and make serialization very cheap).
+
+* Finally, it also spares users in the majority of cases from worrying about 
serialization frameworks and having to register types.
+
+In general, the information about data types is needed during the *pre-flight 
phase* - that is, when the program's calls on `DataStream`
+and `DataSet` are made, and before any call to `execute()`, `print()`, 
`count()`, or `collect()`.
+
+
+## Most Frequent Issues
+
+The most frequent issues where users need to interact with Flink's data type 
handling are:
 
-* To allow using POJOs and grouping/joining them by referring to field names, 
Flink needs the type
-  information to make checks (for typos and type compatibility) before the job 
is executed.
+* **Registering subtypes:** If the function signatures describe only the 
supertypes, but they actually use subtypes of those during execution,
+  it may increase performance a lot to make Flink aware of these subtypes.
+  For that, call `.registerType(clazz)` on the `StreamExecutionEnvironment` or 
`ExecutionEnvironment` for each subtype.
 
-* The more we know, the better serialization and data layout schemes the 
compiler/optimizer can develop.
-  That is quite important for the memory usage paradigm in Flink (work on 
serialized data
-  inside/outside the heap and make serialization very cheap).
+* **Registering custom serializers:** Flink falls back to 
[Kryo](https://github.com/EsotericSoftware/kryo) for the types that it does not 
handle transparently
+  by itself. Not all types are seamlessly handled by Kryo (and thus by Flink). 
For example, many Google Guava collection types do not work well
+  by default. The solution is to register additional serializers for the types 
that cause problems.
+  Call `.getConfig().addDefaultKryoSerializer(clazz, serializer)` on the 
`StreamExecutionEnvironment` or `ExecutionEnvironment`.
+  Additional Kryo serializers are available in many libraries.
 
-* For the upcoming logical programs (see roadmap draft) we need this to know 
the "schema" of functions.
+* **Adding Type Hints:** Sometimes, when Flink cannot infer the generic types 
despits all tricks, a user must pass a *type hint*. That is generally
+  only necessary in the Java API. The [Type Hints 
Section](#type-hints-in-the-java-api) describes that in more detail.
 
-* Finally, it also spares users having to worry about serialization frameworks 
and having to register
-  types at those frameworks.
+* **Manually creating a `TypeInformation`:** This may be necessary for some 
API calls where it is not possible for Flink to infer
+  the data types due to Java's generic type erasure. See [Creating a 
TypeInformation or 
TypeSerializer](#creating-a-typeinformation-or-typeserializer)
+  for details.
 
 
 ## Flink's TypeInformation class
@@ -75,7 +92,7 @@ Internally, Flink makes the following distinctions between 
types:
 
   * POJOs: classes that follow a certain bean-like pattern
 
-* Scala auxiliary types (Option, Either, Lists, Maps, ...)
+* Auxiliary types (Option, Either, Lists, Maps, ...)
 
 * Generic types: These will not be serialized by Flink itself, but by Kryo.
 
@@ -84,18 +101,66 @@ names in the definition of keys: 
`dataSet.join(another).where("name").equalTo("p
 They are also transparent to the runtime and can be handled very efficiently 
by Flink.
 
 
-**Rules for POJO types**
+#### Rules for POJO types
 
 Flink recognizes a data type as a POJO type (and allows "by-name" field 
referencing) if the following
 conditions are fulfilled:
 
 * The class is public and standalone (no non-static inner class)
 * The class has a public no-argument constructor
-* All fields in the class (and all superclasses) are either public or
-  or have a public getter and a setter method that follows the Java beans
+* All fields in the class (and all superclasses) are either public (and 
non-final)
+  or have a public getter- and a setter- method that follows the Java beans
   naming conventions for getters and setters.
 
 
+#### Creating a TypeInformation or TypeSerializer
+
+To create a TypeInformation object for a type, use the language specific way:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+Because Java generally erases generic type information, you need to pass the 
type to the TypeInformation
+construction:
+
+For non-generic types, you can pass the Class:
+{% highlight java %}
+TypeInformation<String> info = TypeInformation.of(String.class);
+{% endhighlight %}
+
+For generic types, you need to "capture" the generic type information via the 
`TypeHint`:
+{% highlight java %}
+TypeInformation<Tuple2<String, Double>> info = TypeInformation.of(new 
TypeHint<Tuple2<String, Double>>(){});
+{% endhighlight %}
+Internally, this creates an anonymous subclass of the TypeHint that captures 
the generic information to preserve it
+until runtime.
+</div>
+
+<div data-lang="scala" markdown="1">
+In Scala, Flink uses *macros* that runs at compile time and captures all 
generic type information while it is
+still available.
+{% highlight scala %}
+// important: this import is needed to access the 'createTypeInformation' 
macro function
+import org.apache.flink.streaming.api.scala._
+
+val stringInfo: TypeInformation[String] = createTypeInformation[String]
+
+val tupleInfo: TypeInformation[(String, Double)] = 
createTypeInformation[(String, Double)]
+{% endhighlight %}
+
+You can still use the same method as in Java as a fallback.
+</div>
+</div>
+
+To create a `TypeSerializer`, simply call `typeInfo.createSerializer(config)` 
on the `TypeInformation` object.
+
+The `config` parameter is of type `ExecutionConfig` and holds the information 
about the program's registered
+custom serializers. Where ever possibly, try to pass the programs proper 
ExecutionConfig. You can usually
+obtain it from `DataStream` or `DataSet` via calling `getExecutionConfig()`. 
Inside functions (like `MapFunction`), you can
+get it by making the function a [Rich Function]() and calling 
`getRuntimeContext().getExecutionConfig()`.
+
+--------
+--------
+
 ## Type Information in the Scala API
 
 Scala has very elaborate concepts for runtime type information though *type 
manifests* and *class tags*. In
@@ -156,15 +221,15 @@ def selectFirst[T : TypeInformation](input: DataSet[(T, 
_)]) : DataSet[T] = {
 {% endhighlight %}
 
 
+--------
+--------
 
-## Type Information in the Java API
 
-Java in general erases generic type information. Only for subclasses of 
generic classes, the subclass
-stores the type to which the generic type variables bind.
+## Type Information in the Java API
 
-Flink uses reflection on the (anonymous) classes that implement the user 
functions to figure out the types of
-the generic parameters of the function. This logic also contains some simple 
type inference for cases where
-the return types of functions are dependent on input types, such as in the 
generic utility method below:
+In the general case, Java erases generic type information. Flink tries to 
reconstruct as much type information
+as possible via reflection, using the few bits that Java preserves (mainly 
function signatures and subclass information).
+This logic also contains some simple type inference for cases where the return 
type of a function depends on its input type:
 
 {% highlight java %}
 public class AppendOne<T> extends MapFunction<T, Tuple2<T, Long>> {
@@ -175,16 +240,14 @@ public class AppendOne<T> extends MapFunction<T, 
Tuple2<T, Long>> {
 }
 {% endhighlight %}
 
-Not in all cases can Flink figure out the data types of functions reliably in 
Java.
-Some issues remain with generic lambdas (we are trying to solve this with the 
Java community,
-see below) and with generic type variables that we cannot infer.
+There are cases where Flink cannot reconstruct all generic type information. 
In that case, a user has to help out via *type hints*.
 
 
 #### Type Hints in the Java API
 
-To help cases where Flink cannot reconstruct the erased generic type 
information, the Java API
-offers so called *type hints* from version 0.9 on. The type hints tell the 
system the type of
-the data set produced by a function. The following gives an example:
+In cases where Flink cannot reconstruct the erased generic type information, 
the Java API
+offers so called *type hints*. The type hints tell the system the type of
+the data stream or data set produced by a function:
 
 {% highlight java %}
 DataSet<SomeType> result = dataSet
@@ -193,12 +256,11 @@ DataSet<SomeType> result = dataSet
 {% endhighlight %}
 
 The `returns` statement specifies the produced type, in this case via a class. 
The hints support
-type definition through
+type definition via
 
 * Classes, for non-parameterized types (no generics)
-* Strings in the form of `returns("Tuple2<Integer, my.SomeType>")`, which are 
parsed and converted
-  to a TypeInformation.
-* A TypeInformation directly
+* TypeHints in the form of `returns(new TypeHint<Tuple2<Integer, 
SomeType>>(){})`. The `TypeHint` class
+  can capture generic type information and preserve it for the runtime (via an 
anonymous subclass).
 
 
 #### Type extraction for Java 8 lambdas
@@ -208,18 +270,7 @@ with an implementing class that extends the function 
interface.
 
 Currently, Flink tries to figure out which method implements the lambda and 
uses Java's generic signatures to
 determine the parameter types and the return type. However, these signatures 
are not generated for lambdas
-by all compilers (as of writing this document only reliably by the Eclipse JDT 
compiler 4.5 from Milestone 2
-onwards)
-
-
-**Improving Type information for Java Lambdas**
-
-One of the Flink committers (Timo Walther) has actually become active in the 
Eclipse JDT compiler community and
-in the OpenJDK community and submitted patches to the compiler to improve 
availability of type information
-available for Java 8 lambdas.
-
-The Eclipse JDT compiler has added support for this as of version 4.5 M4. 
Discussion about the feature in the
-OpenJDK compiler is pending.
+by all compilers (as of writing this document only reliably by the Eclipse JDT 
compiler from 4.5 onwards).
 
 
 #### Serialization of POJO types

Reply via email to