http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/iterations.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/iterations.md b/docs/apis/batch/iterations.md new file mode 100644 index 0000000..912f378 --- /dev/null +++ b/docs/apis/batch/iterations.md @@ -0,0 +1,213 @@ +--- +title: "Iterations" + +# Sub-level navigation +sub-nav-group: batch +sub-nav-pos: 3 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +Iterative algorithms occur in many domains of data analysis, such as *machine learning* or *graph analysis*. Such algorithms are crucial in order to realize the promise of Big Data to extract meaningful information out of your data. With increasing interest to run these kinds of algorithms on very large data sets, there is a need to execute iterations in a massively parallel fashion. + +Flink programs implement iterative algorithms by defining a **step function** and embedding it into a special iteration operator. There are two variants of this operator: **Iterate** and **Delta Iterate**. Both operators repeatedly invoke the step function on the current iteration state until a certain termination condition is reached. + +Here, we provide background on both operator variants and outline their usage. The [programming guide](index.html) explains how to implement the operators in both Scala and Java. We also support both **vertex-centric and gather-sum-apply iterations** through Flink's graph processing API, [Gelly]({{site.baseurl}}/libs/gelly_guide.html). + +The following table provides an overview of both operators: + + +<table class="table table-striped table-hover table-bordered"> + <thead> + <th></th> + <th class="text-center">Iterate</th> + <th class="text-center">Delta Iterate</th> + </thead> + <tr> + <td class="text-center" width="20%"><strong>Iteration Input</strong></td> + <td class="text-center" width="40%"><strong>Partial Solution</strong></td> + <td class="text-center" width="40%"><strong>Workset</strong> and <strong>Solution Set</strong></td> + </tr> + <tr> + <td class="text-center"><strong>Step Function</strong></td> + <td colspan="2" class="text-center">Arbitrary Data Flows</td> + </tr> + <tr> + <td class="text-center"><strong>State Update</strong></td> + <td class="text-center">Next <strong>partial solution</strong></td> + <td> + <ul> + <li>Next workset</li> + <li><strong>Changes to solution set</strong></li> + </ul> + </td> + </tr> + <tr> + <td class="text-center"><strong>Iteration Result</strong></td> + <td class="text-center">Last partial solution</td> + <td class="text-center">Solution set state after last iteration</td> + </tr> + <tr> + <td class="text-center"><strong>Termination</strong></td> + <td> + <ul> + <li><strong>Maximum number of iterations</strong> (default)</li> + <li>Custom aggregator convergence</li> + </ul> + </td> + <td> + <ul> + <li><strong>Maximum number of iterations or empty workset</strong> (default)</li> + <li>Custom aggregator convergence</li> + </ul> + </td> + </tr> +</table> + + +* This will be replaced by the TOC +{:toc} + +Iterate Operator +---------------- + +The **iterate operator** covers the *simple form of iterations*: in each iteration, the **step function** consumes the **entire input** (the *result of the previous iteration*, or the *initial data set*), and computes the **next version of the partial solution** (e.g. `map`, `reduce`, `join`, etc.). + +<p class="text-center"> + <img alt="Iterate Operator" width="60%" src="fig/iterations_iterate_operator.png" /> +</p> + + 1. **Iteration Input**: Initial input for the *first iteration* from a *data source* or *previous operators*. + 2. **Step Function**: The step function will be executed in each iteration. It is an arbitrary data flow consisting of operators like `map`, `reduce`, `join`, etc. and depends on your specific task at hand. + 3. **Next Partial Solution**: In each iteration, the output of the step function will be fed back into the *next iteration*. + 4. **Iteration Result**: Output of the *last iteration* is written to a *data sink* or used as input to the *following operators*. + +There are multiple options to specify **termination conditions** for an iteration: + + - **Maximum number of iterations**: Without any further conditions, the iteration will be executed this many times. + - **Custom aggregator convergence**: Iterations allow to specify *custom aggregators* and *convergence criteria* like sum aggregate the number of emitted records (aggregator) and terminate if this number is zero (convergence criterion). + +You can also think about the iterate operator in pseudo-code: + +~~~java +IterationState state = getInitialState(); + +while (!terminationCriterion()) { + state = step(state); +} + +setFinalState(state); +~~~ + +<div class="panel panel-default"> + <div class="panel-body"> + See the <strong><a href="index.html">Programming Guide</a> </strong> for details and code examples.</div> +</div> + +### Example: Incrementing Numbers + +In the following example, we **iteratively incremenet a set numbers**: + +<p class="text-center"> + <img alt="Iterate Operator Example" width="60%" src="fig/iterations_iterate_operator_example.png" /> +</p> + + 1. **Iteration Input**: The inital input is read from a data source and consists of five single-field records (integers `1` to `5`). + 2. **Step function**: The step function is a single `map` operator, which increments the integer field from `i` to `i+1`. It will be applied to every record of the input. + 3. **Next Partial Solution**: The output of the step function will be the output of the map operator, i.e. records with incremented integers. + 4. **Iteration Result**: After ten iterations, the initial numbers will have been incremented ten times, resulting in integers `11` to `15`. + +~~~ +// 1st 2nd 10th +map(1) -> 2 map(2) -> 3 ... map(10) -> 11 +map(2) -> 3 map(3) -> 4 ... map(11) -> 12 +map(3) -> 4 map(4) -> 5 ... map(12) -> 13 +map(4) -> 5 map(5) -> 6 ... map(13) -> 14 +map(5) -> 6 map(6) -> 7 ... map(14) -> 15 +~~~ + +Note that **1**, **2**, and **4** can be arbitrary data flows. + + +Delta Iterate Operator +---------------------- + +The **delta iterate operator** covers the case of **incremental iterations**. Incremental iterations **selectively modify elements** of their **solution** and evolve the solution rather than fully recompute it. + +Where applicable, this leads to **more efficient algorithms**, because not every element in the solution set changes in each iteration. This allows to **focus on the hot parts** of the solution and leave the **cold parts untouched**. Frequently, the majority of the solution cools down comparatively fast and the later iterations operate only on a small subset of the data. + +<p class="text-center"> + <img alt="Delta Iterate Operator" width="60%" src="fig/iterations_delta_iterate_operator.png" /> +</p> + + 1. **Iteration Input**: The initial workset and solution set are read from *data sources* or *previous operators* as input to the first iteration. + 2. **Step Function**: The step function will be executed in each iteration. It is an arbitrary data flow consisting of operators like `map`, `reduce`, `join`, etc. and depends on your specific task at hand. + 3. **Next Workset/Update Solution Set**: The *next workset* drives the iterative computation and will be fed back into the *next iteration*. Furthermore, the solution set will be updated and implicitly forwarded (it is not required to be rebuild). Both data sets can be updated by different operators of the step function. + 4. **Iteration Result**: After the *last iteration*, the *solution set* is written to a *data sink* or used as input to the *following operators*. + +The default **termination condition** for delta iterations is specified by the **empty workset convergence criterion** and a **maximum number of iterations**. The iteration will terminate when a produced *next workset* is empty or when the maximum number of iterations is reached. It is also possible to specify a **custom aggregator** and **convergence criterion**. + +You can also think about the iterate operator in pseudo-code: + +~~~java +IterationState workset = getInitialState(); +IterationState solution = getInitialSolution(); + +while (!terminationCriterion()) { + (delta, workset) = step(workset, solution); + + solution.update(delta) +} + +setFinalState(solution); +~~~ + +<div class="panel panel-default"> + <div class="panel-body"> + See the <strong><a href="index.html">programming guide</a></strong> for details and code examples.</div> +</div> + +### Example: Propagate Minimum in Graph + +In the following example, every vertex has an **ID** and a **coloring**. Each vertex will propagate its vertex ID to neighboring vertices. The **goal** is to *assign the minimum ID to every vertex in a subgraph*. If a received ID is smaller then the current one, it changes to the color of the vertex with the received ID. One application of this can be found in *community analysis* or *connected components* computation. + +<p class="text-center"> + <img alt="Delta Iterate Operator Example" width="100%" src="fig/iterations_delta_iterate_operator_example.png" /> +</p> + +The **intial input** is set as **both workset and solution set.** In the above figure, the colors visualize the **evolution of the solution set**. With each iteration, the color of the minimum ID is spreading in the respective subgraph. At the same time, the amount of work (exchanged and compared vertex IDs) decreases with each iteration. This corresponds to the **decreasing size of the workset**, which goes from all seven vertices to zero after three iterations, at which time the iteration terminates. The **important observation** is that *the lower subgraph converges before the upper half* does and the delta iteration is able to capture this with the workset abstraction. + +In the upper subgraph **ID 1** (*orange*) is the **minimum ID**. In the **first iteration**, it will get propagated to vertex 2, which will subsequently change its color to orange. Vertices 3 and 4 will receive **ID 2** (in *yellow*) as their current minimum ID and change to yellow. Because the color of *vertex 1* didn't change in the first iteration, it can be skipped it in the next workset. + +In the lower subgraph **ID 5** (*cyan*) is the **minimum ID**. All vertices of the lower subgraph will receive it in the first iteration. Again, we can skip the unchanged vertices (*vertex 5*) for the next workset. + +In the **2nd iteration**, the workset size has already decreased from seven to five elements (vertices 2, 3, 4, 6, and 7). These are part of the iteration and further propagate their current minimum IDs. After this iteration, the lower subgraph has already converged (**cold part** of the graph), as it has no elements in the workset, whereas the upper half needs a further iteration (**hot part** of the graph) for the two remaining workset elements (vertices 3 and 4). + +The iteration **terminates**, when the workset is empty after the **3rd iteration**. + +<a href="#supersteps"></a> + +Superstep Synchronization +------------------------- + +We referred to each execution of the step function of an iteration operator as *a single iteration*. In parallel setups, **multiple instances of the step function are evaluated in parallel** on different partitions of the iteration state. In many settings, one evaluation of the step function on all parallel instances forms a so called **superstep**, which is also the granularity of synchronization. Therefore, *all* parallel tasks of an iteration need to complete the superstep, before a next superstep will be initialized. **Termination criteria** will also be evaluated at superstep barriers. + +<p class="text-center"> + <img alt="Supersteps" width="50%" src="fig/iterations_supersteps.png" /> +</p>
http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/python.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/python.md b/docs/apis/batch/python.md new file mode 100644 index 0000000..74da97e --- /dev/null +++ b/docs/apis/batch/python.md @@ -0,0 +1,610 @@ +--- +title: "Python Programming Guide" +is_beta: true + +# Sub-level navigation +sub-nav-group: batch +sub-nav-id: python_api +sub-nav-pos: 4 +sub-nav-title: Python API +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +Analysis programs in Flink are regular programs that implement transformations on data sets +(e.g., filtering, mapping, joining, grouping). The data sets are initially created from certain +sources (e.g., by reading files, or from collections). Results are returned via sinks, which may for +example write the data to (distributed) files, or to standard output (for example the command line +terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs. +The execution can happen in a local JVM, or on clusters of many machines. + +In order to create your own Flink program, we encourage you to start with the +[program skeleton](#program-skeleton) and gradually add your own +[transformations](#transformations). The remaining sections act as references for additional +operations and advanced features. + +* This will be replaced by the TOC +{:toc} + +Example Program +--------------- + +The following program is a complete, working example of WordCount. You can copy & paste the code +to run it locally. + +{% highlight python %} +from flink.plan.Environment import get_environment +from flink.plan.Constants import INT, STRING +from flink.functions.GroupReduceFunction import GroupReduceFunction + +class Adder(GroupReduceFunction): + def reduce(self, iterator, collector): + count, word = iterator.next() + count += sum([x[0] for x in iterator]) + collector.collect((count, word)) + +if __name__ == "__main__": + env = get_environment() + data = env.from_elements("Who's there?", + "I think I hear them. Stand, ho! Who's there?") + + data \ + .flat_map(lambda x, c: [(1, word) for word in x.lower().split()], (INT, STRING)) \ + .group_by(1) \ + .reduce_group(Adder(), (INT, STRING), combinable=True) \ + .output() + + env.execute(local=True) +{% endhighlight %} + +{% top %} + +Program Skeleton +---------------- + +As we already saw in the example, Flink programs look like regular python +programs with a `if __name__ == "__main__":` block. Each program consists of the same basic parts: + +1. Obtain an `Environment`, +2. Load/create the initial data, +3. Specify transformations on this data, +4. Specify where to put the results of your computations, and +5. Execute your program. + +We will now give an overview of each of those steps but please refer to the respective sections for +more details. + + +The `Environment` is the basis for all Flink programs. You can +obtain one using these static methods on class `Environment`: + +{% highlight python %} +get_environment() +{% endhighlight %} + +For specifying data sources the execution environment has several methods +to read from files. To just read a text file as a sequence of lines, you can use: + +{% highlight python %} +env = get_environment() +text = env.read_text("file:///path/to/file") +{% endhighlight %} + +This will give you a DataSet on which you can then apply transformations. For +more information on data sources and input formats, please refer to +[Data Sources](#data-sources). + +Once you have a DataSet you can apply transformations to create a new +DataSet which you can then write to a file, transform again, or +combine with other DataSets. You apply transformations by calling +methods on DataSet with your own custom transformation function. For example, +a map transformation looks like this: + +{% highlight python %} +data.map(lambda x: x*2, INT) +{% endhighlight %} + +This will create a new DataSet by doubling every value in the original DataSet. +For more information and a list of all the transformations, +please refer to [Transformations](#transformations). + +Once you have a DataSet that needs to be written to disk you can call one +of these methods on DataSet: + +{% highlight python %} +data.write_text("<file-path>", WriteMode=Constants.NO_OVERWRITE) +write_csv("<file-path>", line_delimiter='\n', field_delimiter=',', write_mode=Constants.NO_OVERWRITE) +output() +{% endhighlight %} + +The last method is only useful for developing/debugging on a local machine, +it will output the contents of the DataSet to standard output. (Note that in +a cluster, the result goes to the standard out stream of the cluster nodes and ends +up in the *.out* files of the workers). +The first two do as the name suggests. +Please refer to [Data Sinks](#data-sinks) for more information on writing to files. + +Once you specified the complete program you need to call `execute` on +the `Environment`. This will either execute on your local machine or submit your program +for execution on a cluster, depending on how Flink was started. You can force +a local execution by using `execute(local=True)`. + +{% top %} + +Project setup +--------------- + +Apart from setting up Flink, no additional work is required. The python package can be found in the /resource folder of your Flink distribution. The flink package, along with the plan and optional packages are automatically distributed among the cluster via HDFS when running a job. + +The Python API was tested on Linux systems that have Python 2.7 or 3.4 installed. + +{% top %} + +Lazy Evaluation +--------------- + +All Flink programs are executed lazily: When the program's main method is executed, the data loading +and transformations do not happen directly. Rather, each operation is created and added to the +program's plan. The operations are actually executed when one of the `execute()` methods is invoked +on the Environment object. Whether the program is executed locally or on a cluster depends +on the environment of the program. + +The lazy evaluation lets you construct sophisticated programs that Flink executes as one +holistically planned unit. + +{% top %} + + +Transformations +--------------- + +Data transformations transform one or more DataSets into a new DataSet. Programs can combine +multiple transformations into sophisticated assemblies. + +This section gives a brief overview of the available transformations. The [transformations +documentation](dataset_transformations.html) has a full description of all transformations with +examples. + +<br /> + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Transformation</th> + <th class="text-center">Description</th> + </tr> + </thead> + + <tbody> + <tr> + <td><strong>Map</strong></td> + <td> + <p>Takes one element and produces one element.</p> +{% highlight python %} +data.map(lambda x: x * 2, INT) +{% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>FlatMap</strong></td> + <td> + <p>Takes one element and produces zero, one, or more elements. </p> +{% highlight python %} +data.flat_map( + lambda x,c: [(1,word) for word in line.lower().split() for line in x], + (INT, STRING)) +{% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>MapPartition</strong></td> + <td> + <p>Transforms a parallel partition in a single function call. The function get the partition + as an `Iterator` and can produce an arbitrary number of result values. The number of + elements in each partition depends on the degree-of-parallelism and previous operations.</p> +{% highlight python %} +data.map_partition(lambda x,c: [value * 2 for value in x], INT) +{% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>Filter</strong></td> + <td> + <p>Evaluates a boolean function for each element and retains those for which the function + returns true.</p> +{% highlight python %} +data.filter(lambda x: x > 1000) +{% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>Reduce</strong></td> + <td> + <p>Combines a group of elements into a single element by repeatedly combining two elements + into one. Reduce may be applied on a full data set, or on a grouped data set.</p> +{% highlight python %} +data.reduce(lambda x,y : x + y) +{% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>ReduceGroup</strong></td> + <td> + <p>Combines a group of elements into one or more elements. ReduceGroup may be applied on a + full data set, or on a grouped data set.</p> +{% highlight python %} +class Adder(GroupReduceFunction): + def reduce(self, iterator, collector): + count, word = iterator.next() + count += sum([x[0] for x in iterator) + collector.collect((count, word)) + +data.reduce_group(Adder(), (INT, STRING)) +{% endhighlight %} + </td> + </tr> + + </tr> + <td><strong>Join</strong></td> + <td> + Joins two data sets by creating all pairs of elements that are equal on their keys. + Optionally uses a JoinFunction to turn the pair of elements into a single element. + See <a href="#specifying-keys">keys</a> on how to define join keys. +{% highlight python %} +# In this case tuple fields are used as keys. +# "0" is the join field on the first tuple +# "1" is the join field on the second tuple. +result = input1.join(input2).where(0).equal_to(1) +{% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>CoGroup</strong></td> + <td> + <p>The two-dimensional variant of the reduce operation. Groups each input on one or more + fields and then joins the groups. The transformation function is called per pair of groups. + See <a href="#specifying-keys">keys</a> on how to define coGroup keys.</p> +{% highlight python %} +data1.co_group(data2).where(0).equal_to(1) +{% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>Cross</strong></td> + <td> + <p>Builds the Cartesian product (cross product) of two inputs, creating all pairs of + elements. Optionally uses a CrossFunction to turn the pair of elements into a single + element.</p> +{% highlight python %} +result = data1.cross(data2) +{% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Union</strong></td> + <td> + <p>Produces the union of two data sets.</p> +{% highlight python %} +data.union(data2) +{% endhighlight %} + </td> + </tr> + </tbody> +</table> + +{% top %} + + +Specifying Keys +------------- + +Some transformations (like Join or CoGroup) require that a key is defined on +its argument DataSets, and other transformations (Reduce, GroupReduce) allow that the DataSet is grouped on a key before they are +applied. + +A DataSet is grouped as +{% highlight python %} +reduced = data \ + .group_by(<define key here>) \ + .reduce_group(<do something>) +{% endhighlight %} + +The data model of Flink is not based on key-value pairs. Therefore, +you do not need to physically pack the data set types into keys and +values. Keys are "virtual": they are defined as functions over the +actual data to guide the grouping operator. + +### Define keys for Tuples +{:.no_toc} + +The simplest case is grouping a data set of Tuples on one or more +fields of the Tuple: +{% highlight python %} +reduced = data \ + .group_by(0) \ + .reduce_group(<do something>) +{% endhighlight %} + +The data set is grouped on the first field of the tuples. +The group-reduce function will thus receive groups of tuples with +the same value in the first field. + +{% highlight python %} +grouped = data \ + .group_by(0,1) \ + .reduce(/*do something*/) +{% endhighlight %} + +The data set is grouped on the composite key consisting of the first and the +second fields, therefore the reduce function will receive groups +with the same value for both fields. + +A note on nested Tuples: If you have a DataSet with a nested tuple +specifying `group_by(<index of tuple>)` will cause the system to use the full tuple as a key. + +{% top %} + + +Passing Functions to Flink +-------------------------- + +Certain operations require user-defined functions, whereas all of them accept lambda functions and rich functions as arguments. + +{% highlight python %} +data.filter(lambda x: x > 5) +{% endhighlight %} + +{% highlight python %} +class Filter(FilterFunction): + def filter(self, value): + return value > 5 + +data.filter(Filter()) +{% endhighlight %} + +Rich functions allow the use of imported functions, provide access to broadcast-variables, +can be parameterized using __init__(), and are the go-to-option for complex functions. +They are also the only way to define an optional `combine` function for a reduce operation. + +Lambda functions allow the easy insertion of one-liners. Note that a lambda function has to return +an iterable, if the operation can return multiple values. (All functions receiving a collector argument) + +Flink requires type information at the time when it prepares the program for execution +(when the main method of the program is called). This is done by passing an exemplary +object that has the desired type. This holds also for tuples. + +{% highlight python %} +(INT, STRING) +{% endhighlight %} + +Would denote a tuple containing an int and a string. Note that for Operations that work strictly on tuples (like cross), no braces are required. + +There are a few Constants defined in flink.plan.Constants that allow this in a more readable fashion. + +{% top %} + +Data Types +---------- + +Flink's Python API currently only supports primitive python types (int, float, bool, string) and byte arrays. + +#### Tuples/Lists + +You can use the tuples (or lists) for composite types. Python tuples are mapped to the Flink Tuple type, that contain +a fix number of fields of various types (up to 25). Every field of a tuple can be a primitive type - including further tuples, resulting in nested tuples. + +{% highlight python %} +word_counts = env.from_elements(("hello", 1), ("world",2)) + +counts = word_counts.map(lambda x: x[1], INT) +{% endhighlight %} + +When working with operators that require a Key for grouping or matching records, +Tuples let you simply specify the positions of the fields to be used as key. You can specify more +than one position to use composite keys (see [Section Data Transformations](#transformations)). + +{% highlight python %} +wordCounts \ + .group_by(0) \ + .reduce(MyReduceFunction()) +{% endhighlight %} + +{% top %} + +Data Sources +------------ + +Data sources create the initial data sets, such as from files or from collections. + +File-based: + +- `read_text(path)` - Reads files line wise and returns them as Strings. +- `read_csv(path, type)` - Parses files of comma (or another char) delimited fields. + Returns a DataSet of tuples. Supports the basic java types and their Value counterparts as field + types. + +Collection-based: + +- `from_elements(*args)` - Creates a data set from a Seq. All elements + +**Examples** + +{% highlight python %} +env = get_environment + +# read text file from local files system +localLiens = env.read_text("file:#/path/to/my/textfile") + + read text file from a HDFS running at nnHost:nnPort +hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile") + + read a CSV file with three fields +csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE)) + + create a set from some given elements +values = env.from_elements("Foo", "bar", "foobar", "fubar") +{% endhighlight %} + +{% top %} + +Data Sinks +---------- + +Data sinks consume DataSets and are used to store or return them: + +- `write_text()` - Writes elements line-wise as Strings. The Strings are + obtained by calling the *str()* method of each element. +- `write_csv(...)` - Writes tuples as comma-separated value files. Row and field + delimiters are configurable. The value for each field comes from the *str()* method of the objects. +- `output()` - Prints the *str()* value of each element on the + standard out. + +A DataSet can be input to multiple operations. Programs can write or print a data set and at the +same time run additional transformations on them. + +**Examples** + +Standard data sink methods: + +{% highlight scala %} + write DataSet to a file on the local file system +textData.write_text("file:///my/result/on/localFS") + + write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort +textData.write_text("hdfs://nnHost:nnPort/my/result/on/localFS") + + write DataSet to a file and overwrite the file if it exists +textData.write_text("file:///my/result/on/localFS", WriteMode.OVERWRITE) + + tuples as lines with pipe as the separator "a|b|c" +values.write_csv("file:///path/to/the/result/file", line_delimiter="\n", field_delimiter="|") + + this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines +values.write_text("file:///path/to/the/result/file") +{% endhighlight %} + +{% top %} + +Broadcast Variables +------------------- + +Broadcast variables allow you to make a data set available to all parallel instances of an +operation, in addition to the regular input of the operation. This is useful for auxiliary data +sets, or data-dependent parameterization. The data set will then be accessible at the operator as a +Collection. + +- **Broadcast**: broadcast sets are registered by name via `with_broadcast_set(DataSet, String)` +- **Access**: accessible via `self.context.get_broadcast_variable(String)` at the target operator + +{% highlight python %} +class MapperBcv(MapFunction): + def map(self, value): + factor = self.context.get_broadcast_variable("bcv")[0][0] + return value * factor + +# 1. The DataSet to be broadcasted +toBroadcast = env.from_elements(1, 2, 3) +data = env.from_elements("a", "b") + +# 2. Broadcast the DataSet +data.map(MapperBcv(), INT).with_broadcast_set("bcv", toBroadcast) +{% endhighlight %} + +Make sure that the names (`bcv` in the previous example) match when registering and +accessing broadcasted data sets. + +**Note**: As the content of broadcast variables is kept in-memory on each node, it should not become +too large. For simpler things like scalar values you can simply parameterize the rich function. + +{% top %} + +Parallel Execution +------------------ + +This section describes how the parallel execution of programs can be configured in Flink. A Flink +program consists of multiple tasks (operators, data sources, and sinks). A task is split into +several parallel instances for execution and each parallel instance processes a subset of the task's +input data. The number of parallel instances of a task is called its *parallelism* or *degree of +parallelism (DOP)*. + +The degree of parallelism of a task can be specified in Flink on different levels. + +### Execution Environment Level + +Flink programs are executed in the context of an [execution environmentt](#program-skeleton). An +execution environment defines a default parallelism for all operators, data sources, and data sinks +it executes. Execution environment parallelism can be overwritten by explicitly configuring the +parallelism of an operator. + +The default parallelism of an execution environment can be specified by calling the +`set_degree_of_parallelism()` method. To execute all operators, data sources, and data sinks of the +[WordCount](#example-program) example program with a parallelism of `3`, set the default parallelism of the +execution environment as follows: + +{% highlight python %} +env = get_environment() +env.set_degree_of_parallelism(3) + +text.flat_map(lambda x,c: x.lower().split(), (INT, STRING)) \ + .group_by(1) \ + .reduce_group(Adder(), (INT, STRING), combinable=True) \ + .output() + +env.execute() +{% endhighlight %} + +### System Level + +A system-wide default parallelism for all execution environments can be defined by setting the +`parallelism.default` property in `./conf/flink-conf.yaml`. See the +[Configuration]({{ site.baseurl }}/setup/config.html) documentation for details. + +{% top %} + +Executing Plans +--------------- + +To run the plan with Flink, go to your Flink distribution, and run the pyflink.sh script from the /bin folder. +use pyflink2.sh for python 2.7, and pyflink3.sh for python 3.4. The script containing the plan has to be passed +as the first argument, followed by a number of additional python packages, and finally, separated by - additional +arguments that will be fed to the script. + +{% highlight python %} +./bin/pyflink<2/3>.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]] +{% endhighlight %} + +{% top %} + +Debugging +--------------- + +If you are running Flink programs locally, you can debug your program following this guide. +First you have to enable debugging by setting the debug switch in the `env.execute(debug=True)` call. After +submitting your program, open the jobmanager log file, and look for a line that says +`Waiting for external Process : <taskname>. Run python /tmp/flink/executor.py <port>` Now open `/tmp/flink` in your python +IDE and run the `executor.py <port>`. + +{% top %} http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/best_practices.md ---------------------------------------------------------------------- diff --git a/docs/apis/best_practices.md b/docs/apis/best_practices.md index 9a08222..9444fb6 100644 --- a/docs/apis/best_practices.md +++ b/docs/apis/best_practices.md @@ -1,5 +1,8 @@ --- title: "Best Practices" +# Top-level navigation +top-nav-group: apis +top-nav-pos: 4 --- <!-- Licensed to the Apache Software Foundation (ASF) under one @@ -20,9 +23,6 @@ specific language governing permissions and limitations under the License. --> -<a href="#top"></a> - - This page contains a collection of best practices for Flink programmers on how to solve frequently encountered problems. @@ -397,4 +397,3 @@ Next, you need to put the following jar files into the `lib/` folder: * `logback-classic.jar` * `logback-core.jar` * `log4j-over-slf4j.jar`: This bridge needs to be present in the classpath for redirecting logging calls from Hadoop (which is using Log4j) to Slf4j. - http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/cli.md ---------------------------------------------------------------------- diff --git a/docs/apis/cli.md b/docs/apis/cli.md index 1150123..e22e213 100644 --- a/docs/apis/cli.md +++ b/docs/apis/cli.md @@ -1,5 +1,8 @@ --- title: "Command-Line Interface" +# Top-level navigation +top-nav-group: apis +top-nav-pos: 5 --- <!-- Licensed to the Apache Software Foundation (ASF) under one http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/cluster_execution.md ---------------------------------------------------------------------- diff --git a/docs/apis/cluster_execution.md b/docs/apis/cluster_execution.md index 54e1b41..05ee688 100644 --- a/docs/apis/cluster_execution.md +++ b/docs/apis/cluster_execution.md @@ -1,5 +1,8 @@ --- title: "Cluster Execution" +# Top-level navigation +top-nav-group: apis +top-nav-pos: 8 --- <!-- Licensed to the Apache Software Foundation (ASF) under one @@ -98,9 +101,9 @@ The latter version is recommended as it respects the classloader management in F To provide these dependencies not included by Flink we suggest two options with Maven. -1. The maven assembly plugin builds a so-called uber-jar(executable jar) +1. The maven assembly plugin builds a so-called uber-jar(executable jar) containing all your dependencies. -Assembly configuration is straight-forward, but the resulting jar might become bulky. See +Assembly configuration is straight-forward, but the resulting jar might become bulky. See [usage](http://maven.apache.org/plugins/maven-assembly-plugin/usage.html). 2. The maven unpack plugin, for unpacking the relevant parts of the dependencies and then package it with your code. http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/connectors.md ---------------------------------------------------------------------- diff --git a/docs/apis/connectors.md b/docs/apis/connectors.md new file mode 100644 index 0000000..ebacb33 --- /dev/null +++ b/docs/apis/connectors.md @@ -0,0 +1,241 @@ +--- +title: "Connectors" +# Top-level navigation +top-nav-group: apis +top-nav-pos: 3 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* TOC +{:toc} + +## Reading from file systems + +Flink has build-in support for the following file systems: + +| Filesystem | Scheme | Notes | +| ------------------------------------- |--------------| ------ | +| Hadoop Distributed File System (HDFS) | `hdfs://` | All HDFS versions are supported | +| Amazon S3 | `s3://` | Support through Hadoop file system implementation (see below) | +| MapR file system | `maprfs://` | The user has to manually place the required jar files in the `lib/` dir | +| Tachyon | `tachyon://` | Support through Hadoop file system implementation (see below) | + + + +### Using Hadoop file system implementations + +Apache Flink allows users to use any file system implementing the `org.apache.hadoop.fs.FileSystem` +interface. There are Hadoop `FileSystem` implementations for + +- [S3](https://aws.amazon.com/s3/) (tested) +- [Google Cloud Storage Connector for Hadoop](https://cloud.google.com/hadoop/google-cloud-storage-connector) (tested) +- [Tachyon](http://tachyon-project.org/) (tested) +- [XtreemFS](http://www.xtreemfs.org/) (tested) +- FTP via [Hftp](http://hadoop.apache.org/docs/r1.2.1/hftp.html) (not tested) +- and many more. + +In order to use a Hadoop file system with Flink, make sure that + +- the `flink-conf.yaml` has set the `fs.hdfs.hadoopconf` property set to the Hadoop configuration directory. +- the Hadoop configuration (in that directory) has an entry for the required file system. Examples for S3 and Tachyon are shown below. +- the required classes for using the file system are available in the `lib/` folder of the Flink installation (on all machines running Flink). If putting the files into the directory is not possible, Flink is also respecting the `HADOOP_CLASSPATH` environment variable to add Hadoop jar files to the classpath. + +#### Amazon S3 + +For Amazon S3 support add the following entries into the `core-site.xml` file: + +~~~xml +<!-- configure the file system implementation --> +<property> + <name>fs.s3.impl</name> + <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value> +</property> + +<!-- set your AWS ID --> +<property> + <name>fs.s3.awsAccessKeyId</name> + <value>putKeyHere</value> +</property> + +<!-- set your AWS access key --> +<property> + <name>fs.s3.awsSecretAccessKey</name> + <value>putSecretHere</value> +</property> +~~~ + +#### Tachyon + +For Tachyon support add the following entry into the `core-site.xml` file: + +~~~xml +<property> + <name>fs.tachyon.impl</name> + <value>tachyon.hadoop.TFS</value> +</property> +~~~ + + +## Connecting to other systems using Input/OutputFormat wrappers for Hadoop + +Apache Flink allows users to access many different systems as data sources or sinks. +The system is designed for very easy extensibility. Similar to Apache Hadoop, Flink has the concept +of so called `InputFormat`s and `OutputFormat`s. + +One implementation of these `InputFormat`s is the `HadoopInputFormat`. This is a wrapper that allows +users to use all existing Hadoop input formats with Flink. + +This section shows some examples for connecting Flink to other systems. +[Read more about Hadoop compatibility in Flink](hadoop_compatibility.html). + +## Avro support in Flink + +Flink has extensive build-in support for [Apache Avro](http://avro.apache.org/). This allows to easily read from Avro files with Flink. +Also, the serialization framework of Flink is able to handle classes generated from Avro schemas. + +In order to read data from an Avro file, you have to specify an `AvroInputFormat`. + +**Example**: + +~~~java +AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); +DataSet<User> usersDS = env.createInput(users); +~~~ + +Note that `User` is a POJO generated by Avro. Flink also allows to perform string-based key selection of these POJOs. For example: + +~~~java +usersDS.groupBy("name") +~~~ + + +Note that using the `GenericData.Record` type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use. + +Flink's POJO field selection also works with POJOs generated from Avro. However, the usage is only possible if the field types are written correctly to the generated class. If a field is of type `Object` you can not use the field as a join or grouping key. +Specifying a field in Avro like this `{"name": "type_double_test", "type": "double"},` works fine, however specifying it as a UNION-type with only one field (`{"name": "type_double_test", "type": ["double"]},`) will generate a field of type `Object`. Note that specifying nullable types (`{"name": "type_double_test", "type": ["null", "double"]},`) is possible! + + + +### Access Microsoft Azure Table Storage + +_Note: This example works starting from Flink 0.6-incubating_ + +This example is using the `HadoopInputFormat` wrapper to use an existing Hadoop input format implementation for accessing [Azure's Table Storage](https://azure.microsoft.com/en-us/documentation/articles/storage-introduction/). + +1. Download and compile the `azure-tables-hadoop` project. The input format developed by the project is not yet available in Maven Central, therefore, we have to build the project ourselves. +Execute the following commands: + + ~~~bash + git clone https://github.com/mooso/azure-tables-hadoop.git + cd azure-tables-hadoop + mvn clean install + ~~~ + +2. Setup a new Flink project using the quickstarts: + + ~~~bash + curl https://flink.apache.org/q/quickstart.sh | bash + ~~~ + +3. Add the following dependencies (in the `<dependencies>` section) to your `pom.xml` file: + + ~~~xml + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-hadoop-compatibility</artifactId> + <version>{{site.version}}</version> + </dependency> + <dependency> + <groupId>com.microsoft.hadoop</groupId> + <artifactId>microsoft-hadoop-azure</artifactId> + <version>0.0.4</version> + </dependency> + ~~~ + + `flink-hadoop-compatibility` is a Flink package that provides the Hadoop input format wrappers. + `microsoft-hadoop-azure` is adding the project we've build before to our project. + +The project is now prepared for starting to code. We recommend to import the project into an IDE, such as Eclipse or IntelliJ. (Import as a Maven project!). +Browse to the code of the `Job.java` file. Its an empty skeleton for a Flink job. + +Paste the following code into it: + +~~~java +import java.util.Map; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import com.microsoft.hadoop.azure.AzureTableConfiguration; +import com.microsoft.hadoop.azure.AzureTableInputFormat; +import com.microsoft.hadoop.azure.WritableEntity; +import com.microsoft.windowsazure.storage.table.EntityProperty; + +public class AzureTableExample { + + public static void main(String[] args) throws Exception { + // set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // create a AzureTableInputFormat, using a Hadoop input format wrapper + HadoopInputFormat<Text, WritableEntity> hdIf = new HadoopInputFormat<Text, WritableEntity>(new AzureTableInputFormat(), Text.class, WritableEntity.class, new Job()); + + // set the Account URI, something like: https://apacheflink.table.core.windows.net + hdIf.getConfiguration().set(AzureTableConfiguration.Keys.ACCOUNT_URI.getKey(), "TODO"); + // set the secret storage key here + hdIf.getConfiguration().set(AzureTableConfiguration.Keys.STORAGE_KEY.getKey(), "TODO"); + // set the table name here + hdIf.getConfiguration().set(AzureTableConfiguration.Keys.TABLE_NAME.getKey(), "TODO"); + + DataSet<Tuple2<Text, WritableEntity>> input = env.createInput(hdIf); + // a little example how to use the data in a mapper. + DataSet<String> fin = input.map(new MapFunction<Tuple2<Text,WritableEntity>, String>() { + @Override + public String map(Tuple2<Text, WritableEntity> arg0) throws Exception { + System.err.println("--------------------------------\nKey = "+arg0.f0); + WritableEntity we = arg0.f1; + + for(Map.Entry<String, EntityProperty> prop : we.getProperties().entrySet()) { + System.err.println("key="+prop.getKey() + " ; value (asString)="+prop.getValue().getValueAsString()); + } + + return arg0.f0.toString(); + } + }); + + // emit result (this works only locally) + fin.print(); + + // execute program + env.execute("Azure Example"); + } +} +~~~ + +The example shows how to access an Azure table and turn data into Flink's `DataSet` (more specifically, the type of the set is `DataSet<Tuple2<Text, WritableEntity>>`). With the `DataSet`, you can apply all known transformations to the DataSet. + +## Access MongoDB + +This [GitHub repository documents how to use MongoDB with Apache Flink (starting from 0.7-incubating)](https://github.com/okkam-it/flink-mongodb-test). + +