Author: fhueske
Date: Fri Mar 13 22:04:36 2015
New Revision: 1666595

URL: http://svn.apache.org/r1666595
Log:
Fixed typos in join blog post (2)

Modified:
    flink/_posts/2015-03-13-peeking-into-Apache-Flinks-Engine-Room.md

Modified: flink/_posts/2015-03-13-peeking-into-Apache-Flinks-Engine-Room.md
URL: 
http://svn.apache.org/viewvc/flink/_posts/2015-03-13-peeking-into-Apache-Flinks-Engine-Room.md?rev=1666595&r1=1666594&r2=1666595&view=diff
==============================================================================
--- flink/_posts/2015-03-13-peeking-into-Apache-Flinks-Engine-Room.md (original)
+++ flink/_posts/2015-03-13-peeking-into-Apache-Flinks-Engine-Room.md Fri Mar 
13 22:04:36 2015
@@ -8,23 +8,23 @@ categories: news
 ##Peeking into Apache Flink's Engine Room
 ####Join Processing in Apache Flink
 
-Joins are prevalent operations in many data processing applications. Most data 
processing systems feature APIs that make joining datasets very easy. However, 
the internal algorithms for join processing are much more involved especially 
if large datasets need to be efficiently handled. Therefore, join processing 
serves as a good example to discuss the salient design points and 
implementation details of a data processing system.
+Joins are prevalent operations in many data processing applications. Most data 
processing systems feature APIs that make joining data sets very easy. However, 
the internal algorithms for join processing are much more involved especially 
if large data sets need to be efficiently handled. Therefore, join processing 
serves as a good example to discuss the salient design points and 
implementation details of a data processing system.
 
 In this blog post, we cut through Apache Flink’s layered architecture and 
take a look at its internals with a focus on how it handles joins. 
Specifically, I will
 
-* show how easy it is to join datasets using Flink’s fluent APIs, 
+* show how easy it is to join data sets using Flink’s fluent APIs, 
 * discuss basic distributed join strategies, Flink’s join implementations, 
and its memory management,
 * talk about Flink’s optimizer that automatically chooses join strategies,
-* show some performance numbers for joining datasets of different sizes, and 
finally
-* briefly discuss joining of co-located and pre-sorted datasets.
+* show some performance numbers for joining data sets of different sizes, and 
finally
+* briefly discuss joining of co-located and pre-sorted data sets.
 
 *Disclaimer*: This blog post is exclusively about equi-joins. Whenever I say 
“join” in the following, I actually mean “equi-join”.
 
 ###How do I join with Flink?
 
-Flink provides fluent APIs in Java and Scala to write data flow programs. 
Flink’s APIs are centered around parallel data collections which are called 
datasets. datasets are processed by applying Transformations that compute new 
datasets. Flink’s transformations include Map and Reduce as known from 
MapReduce [[1]](http://research.google.com/archive/mapreduce.html) but also 
operators for joining, co-grouping, and iterative processing. The documentation 
gives an overview of all available transformations 
[[2]](http://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html).
 
+Flink provides fluent APIs in Java and Scala to write data flow programs. 
Flink’s APIs are centered around parallel data collections which are called 
data sets. data sets are processed by applying Transformations that compute new 
data sets. Flink’s transformations include Map and Reduce as known from 
MapReduce [[1]](http://research.google.com/archive/mapreduce.html) but also 
operators for joining, co-grouping, and iterative processing. The documentation 
gives an overview of all available transformations 
[[2]](http://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html).
 
 
-Joining two Scala case class datasets is very easy as the following example 
shows:
+Joining two Scala case class data sets is very easy as the following example 
shows:
 
 ```scala
 // define your data types
@@ -37,7 +37,7 @@ val users: DataSet[User] = ...
 
 // filter the users data set
 val germanUsers = users.filter((u) => u.country.equals("de"))
-// join datasets
+// join data sets
 val germanVisits: DataSet[(PageVisit, User)] =
       // equi-join condition (PageVisit.userId = User.id)
      visits.join(germanUsers).where("userId").equalTo("id")
@@ -55,14 +55,14 @@ See the documentation for more details o
 
 ###How does Flink join my data?
 
-Flink uses techniques which are well known from parallel database systems to 
efficiently execute parallel joins. A join operator must establish all pairs of 
elements from its input datasets for which the join condition evaluates to 
true. In a standalone system, the most straight-forward implementation of a 
join is the so-called nested-loop join which builds the full Cartesian product 
and evaluates the join condition for each pair of elements. This strategy has 
quadratic complexity and does obviously not scale to large inputs.
+Flink uses techniques which are well known from parallel database systems to 
efficiently execute parallel joins. A join operator must establish all pairs of 
elements from its input data sets for which the join condition evaluates to 
true. In a standalone system, the most straight-forward implementation of a 
join is the so-called nested-loop join which builds the full Cartesian product 
and evaluates the join condition for each pair of elements. This strategy has 
quadratic complexity and does obviously not scale to large inputs.
 
 In a distributed system joins are commonly processed in two steps:
 
 1. The data of both inputs is distributed across all parallel instances that 
participate in the join and
 1. each parallel instance performs a standard stand-alone join algorithm on 
its local partition of the overall data. 
 
-The distribution of data across parallel instances must ensure that each valid 
join pair can be locally built by exactly one instance. For both steps, there 
are multiple valid strategies that can be independently picked and which are 
favorable in different situations. In Flink terminology, the first phase is 
called Ship Strategy and the second phase Local Strategy. In the following I 
will describe Flink’s ship and local strategies to join two datasets *R* and 
*S*.
+The distribution of data across parallel instances must ensure that each valid 
join pair can be locally built by exactly one instance. For both steps, there 
are multiple valid strategies that can be independently picked and which are 
favorable in different situations. In Flink terminology, the first phase is 
called Ship Strategy and the second phase Local Strategy. In the following I 
will describe Flink’s ship and local strategies to join two data sets *R* and 
*S*.
 
 ####Ship Strategies
 Flink features two ship strategies to establish a valid data partitioning for 
a join:
@@ -70,7 +70,7 @@ Flink features two ship strategies to es
 * the *Repartition-Repartition* strategy (RR) and
 * the *Broadcast-Forward* strategy (BF).
 
-The Repartition-Repartition strategy partitions both inputs, R and S, on their 
join key attributes using the same partitioning function. Each partition is 
assigned to exactly one parallel join instance and all data of that partition 
is sent to its associated instance. This ensures that all elements that share 
the same join key are shipped to the same parallel instance and can be locally 
joined. The cost of the RR strategy is a full shuffle of both datasets over the 
network.
+The Repartition-Repartition strategy partitions both inputs, R and S, on their 
join key attributes using the same partitioning function. Each partition is 
assigned to exactly one parallel join instance and all data of that partition 
is sent to its associated instance. This ensures that all elements that share 
the same join key are shipped to the same parallel instance and can be locally 
joined. The cost of the RR strategy is a full shuffle of both data sets over 
the network.
 
 <center>
 <img src="{{ site.baseurl }}/img/blog/joins-broadcast.png" 
style="width:90%;margin:15px">
@@ -89,13 +89,13 @@ Before delving into the details of Flink
 
 Flink handles this challenge by actively managing its memory. When a worker 
node (TaskManager) is started, it allocates a fixed portion (70% by default) of 
the JVM’s heap memory that is available after initialization as 32KB byte 
arrays. These byte arrays are distributed as working memory to all algorithms 
that need to hold significant portions of data in memory. The algorithms 
receive their input data as Java data objects and serialize them into their 
working memory.
 
-This design has several nice properties. First, the number of data objects on 
the JVM heap is much lower resulting in less garbage collection pressure. 
Second, objects on the heap have a certain space overhead and the binary 
representation is more compact. Especially datasets of many small elements 
benefit from that. Third, an algorithm knows exactly when the input data 
exceeds its working memory and can react by writing some of its filled byte 
arrays to the worker’s local filesystem. After the content of a byte array is 
written to disk, it can be reused to process more data. Reading data back into 
memory is as simple as reading the binary data from the local filesystem. The 
following figure illustrates Flink’s memory management.
+This design has several nice properties. First, the number of data objects on 
the JVM heap is much lower resulting in less garbage collection pressure. 
Second, objects on the heap have a certain space overhead and the binary 
representation is more compact. Especially data sets of many small elements 
benefit from that. Third, an algorithm knows exactly when the input data 
exceeds its working memory and can react by writing some of its filled byte 
arrays to the worker’s local filesystem. After the content of a byte array is 
written to disk, it can be reused to process more data. Reading data back into 
memory is as simple as reading the binary data from the local filesystem. The 
following figure illustrates Flink’s memory management.
 
 <center>
 <img src="{{ site.baseurl }}/img/blog/joins-memmgmt.png" 
style="width:90%;margin:15px">
 </center>
 
-This active memory management makes Flink extremely robust for processing very 
large datasets on limited memory resources while preserving all benefits of 
in-memory processing if data is small enough to fit in-memory. De/serializing 
data into and from memory has a certain cost overhead compared to simply 
holding all data elements on the JVM’s heap. However, Flink features 
efficient custom de/serializers which also allow to perform certain operations 
such as comparisons directly on serialized data without deserializing data 
objects from memory.
+This active memory management makes Flink extremely robust for processing very 
large data sets on limited memory resources while preserving all benefits of 
in-memory processing if data is small enough to fit in-memory. De/serializing 
data into and from memory has a certain cost overhead compared to simply 
holding all data elements on the JVM’s heap. However, Flink features 
efficient custom de/serializers which also allow to perform certain operations 
such as comparisons directly on serialized data without deserializing data 
objects from memory.
 
 ####Local Strategies
 
@@ -104,7 +104,7 @@ After the data has been distributed acro
 * the *Sort-Merge-Join* strategy (SM) and 
 * the *Hybrid-Hash-Join* strategy (HH).
 
-The Sort-Merge-Join works by first sorting both input datasets on their join 
key attributes (Sort Phase) and merging the sorted datasets as a second step 
(Merge Phase). The sort is done in-memory if the local partition of a data set 
is small enough. Otherwise, an external merge-sort is done by collecting data 
until the working memory is filled, sorting it, writing the sorted data to the 
local filesystem, and starting over by filling the working memory again with 
more incoming data. After all input data has been received, sorted, and written 
as sorted runs to the local file system, a fully sorted stream can be obtained. 
This is done by reading the partially sorted runs from the local filesystem and 
sort-merging the records on the fly. Once the sorted streams of both inputs are 
available, both streams are sequentially read and merge-joined in a zig-zag 
fashion by comparing the sorted join key attributes, building join element 
pairs for matching keys, and advancing the sorted stream wi
 th the lower join key. The figure below shows how the Sort-Merge-Join strategy 
works.
+The Sort-Merge-Join works by first sorting both input data sets on their join 
key attributes (Sort Phase) and merging the sorted data sets as a second step 
(Merge Phase). The sort is done in-memory if the local partition of a data set 
is small enough. Otherwise, an external merge-sort is done by collecting data 
until the working memory is filled, sorting it, writing the sorted data to the 
local filesystem, and starting over by filling the working memory again with 
more incoming data. After all input data has been received, sorted, and written 
as sorted runs to the local file system, a fully sorted stream can be obtained. 
This is done by reading the partially sorted runs from the local filesystem and 
sort-merging the records on the fly. Once the sorted streams of both inputs are 
available, both streams are sequentially read and merge-joined in a zig-zag 
fashion by comparing the sorted join key attributes, building join element 
pairs for matching keys, and advancing the sorted stream 
 with the lower join key. The figure below shows how the Sort-Merge-Join 
strategy works.
 
 <center>
 <img src="{{ site.baseurl }}/img/blog/joins-smj.png" 
style="width:90%;margin:15px">
@@ -118,7 +118,7 @@ The Hybrid-Hash-Join distinguishes its i
 
 ###How does Flink choose join strategies?
 
-Ship and local strategies do not depend on each other and can be independently 
chosen. Therefore, Flink can execute a join of two datasets R and S in nine 
different ways by combining any of the three ship strategies (RR, BF with R 
being broadcasted, BF with S being broadcasted) with any of the three local 
strategies (SM, HH with R being build-side, HH with S being build-side). Each 
of these strategy combinations results in different execution performance 
depending on the data sizes and the available amount of working memory. In case 
of a small data set R and a much larger data set S, broadcasting R and using it 
as build-side input of a Hybrid-Hash-Join is usually a good choice because the 
much larger data set S is not shipped and not materialized (given that the hash 
table completely fits into memory). If both datasets are rather large or the 
join is performed on many parallel instances, repartitioning both inputs is a 
robust choice.
+Ship and local strategies do not depend on each other and can be independently 
chosen. Therefore, Flink can execute a join of two data sets R and S in nine 
different ways by combining any of the three ship strategies (RR, BF with R 
being broadcasted, BF with S being broadcasted) with any of the three local 
strategies (SM, HH with R being build-side, HH with S being build-side). Each 
of these strategy combinations results in different execution performance 
depending on the data sizes and the available amount of working memory. In case 
of a small data set R and a much larger data set S, broadcasting R and using it 
as build-side input of a Hybrid-Hash-Join is usually a good choice because the 
much larger data set S is not shipped and not materialized (given that the hash 
table completely fits into memory). If both data sets are rather large or the 
join is performed on many parallel instances, repartitioning both inputs is a 
robust choice.
 
 Flink features a cost-based optimizer which automatically chooses the 
execution strategies for all operators including joins. Without going into the 
details of cost-based optimization, this is done by computing cost estimates 
for execution plans with different strategies and picking the plan with the 
least estimated costs. Thereby, the optimizer estimates the amount of data 
which is shipped over the the network and written to disk. If no reliable size 
estimates for the input data can be obtained, the optimizer falls back to 
robust default choices. A key feature of the optimizer is to reason about 
existing data properties. For example, if the data of one input is already 
partitioned in a suitable way, the generated candidate plans will not 
repartition this input. Hence, the choice of a RR ship strategy becomes more 
likely. The same applies for previously sorted data and the Sort-Merge-Join 
strategy. Flink programs can help the optimizer to reason about existing data 
properties by pro
 viding semantic information about  user-defined functions 
[[4]](http://ci.apache.org/projects/flink/flink-docs-master/programming_guide.html#semantic-annotations).
 While the optimizer is a killer feature of Flink, it can happen that a user 
knows better than the optimizer how to execute a specific join. Similar to 
relational database systems, Flink offers optimizer hints to tell the optimizer 
which join strategies to pick 
[[5]](http://ci.apache.org/projects/flink/flink-docs-master/dataset_transformations.html#join-algorithm-hints).
 
@@ -132,7 +132,7 @@ Alright, that sounds good, but how fast
 
 The joins with 1 to 3 GB build side (blue bars) are pure in-memory joins. The 
other joins partially spill data to disk (4 to 12GB, orange bars). The results 
show that the performance of Flink’s Hybrid-Hash-Join remains stable as long 
as the hash table completely fits into memory. As soon as the hash table 
becomes larger than the working memory, parts of the hash table and 
corresponding parts of the probe side are spilled to disk. The chart shows that 
the performance of the Hybrid-Hash-Join gracefully decreases in this situation, 
i.e., there is no sharp increase in runtime when the join starts spilling. In 
combination with Flink’s robust memory management, this execution behavior 
gives smooth performance without the need for fine-grained, data-dependent 
memory tuning.
 
-So, Flink’s Hybrid-Hash-Join implementation performs well on a single thread 
even for limited memory resources, but how good is Flink’s performance when 
joining larger datasets in a distributed setting? For the next experiment we 
compare the performance of the most common join strategy combinations, namely:
+So, Flink’s Hybrid-Hash-Join implementation performs well on a single thread 
even for limited memory resources, but how good is Flink’s performance when 
joining larger data sets in a distributed setting? For the next experiment we 
compare the performance of the most common join strategy combinations, namely:
 
 * Broadcast-Forward, Hybrid-Hash-Join (broadcasting and building with the 
smaller side),
 * Repartition, Hybrid-Hash-Join (building with the smaller side), and
@@ -157,7 +157,7 @@ As expected, the Broadcast-Forward strat
 
 ###I’ve got sooo much data to join, do I really need to ship it?
 
-We have seen that off-the-shelf distributed joins work really well in Flink. 
But what if your data is so huge that you do not want to shuffle it across your 
cluster? We recently added some features to Flink for specifying semantic 
properties (partitioning and sorting) on input splits and co-located reading of 
local input files. With these tools at hand, it is possible to join 
pre-partitioned datasets from your local filesystem without sending a single 
byte over your cluster’s network. If the input data is even pre-sorted, the 
join can be done as a Sort-Merge-Join without sorting, i.e., the join is 
essentially done on-the-fly. Exploiting co-location requires a very special 
setup though. Data needs to be stored on the local filesystem because HDFS does 
not feature data co-location and might move file blocks across data nodes. That 
means you need to take care of many things yourself which HDFS would have done 
for you, including replication to avoid data loss. On the other hand, pe
 rformance gains of joining co-located and pre-sorted can be quite substantial.
+We have seen that off-the-shelf distributed joins work really well in Flink. 
But what if your data is so huge that you do not want to shuffle it across your 
cluster? We recently added some features to Flink for specifying semantic 
properties (partitioning and sorting) on input splits and co-located reading of 
local input files. With these tools at hand, it is possible to join 
pre-partitioned data sets from your local filesystem without sending a single 
byte over your cluster’s network. If the input data is even pre-sorted, the 
join can be done as a Sort-Merge-Join without sorting, i.e., the join is 
essentially done on-the-fly. Exploiting co-location requires a very special 
setup though. Data needs to be stored on the local filesystem because HDFS does 
not feature data co-location and might move file blocks across data nodes. That 
means you need to take care of many things yourself which HDFS would have done 
for you, including replication to avoid data loss. On the other hand, p
 erformance gains of joining co-located and pre-sorted can be quite substantial.
 
 ###tl;dr: What should I remember from all of this?
 


Reply via email to