http://git-wip-us.apache.org/repos/asf/flink-web/blob/9ec0a879/content/news/2015/02/09/streaming-example.html
----------------------------------------------------------------------
diff --git a/content/news/2015/02/09/streaming-example.html
b/content/news/2015/02/09/streaming-example.html
new file mode 100644
index 0000000..0b83ec4
--- /dev/null
+++ b/content/news/2015/02/09/streaming-example.html
@@ -0,0 +1,837 @@
+<!DOCTYPE html>
+<html lang="en">
+ <head>
+ <meta charset="utf-8">
+ <meta http-equiv="X-UA-Compatible" content="IE=edge">
+ <meta name="viewport" content="width=device-width, initial-scale=1">
+ <!-- The above 3 meta tags *must* come first in the head; any other head
content must come *after* these tags -->
+ <title>Apache Flink: Introducing Flink Streaming</title>
+ <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
+ <link rel="icon" href="/favicon.ico" type="image/x-icon">
+
+ <!-- Bootstrap -->
+ <link rel="stylesheet"
href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css">
+ <link rel="stylesheet" href="/css/flink.css">
+ <link rel="stylesheet" href="/css/syntax.css">
+
+ <!-- Blog RSS feed -->
+ <link href="/blog/feed.xml" rel="alternate" type="application/rss+xml"
title="Apache Flink Blog: RSS feed" />
+
+ <!-- jQuery (necessary for Bootstrap's JavaScript plugins) -->
+ <!-- We need to load Jquery in the header for custom google analytics
event tracking-->
+ <script
src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.2/jquery.min.js"></script>
+
+ <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media
queries -->
+ <!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
+ <!--[if lt IE 9]>
+ <script
src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
+ <script
src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
+ <![endif]-->
+ </head>
+ <body>
+
+
+ <!-- Main content. -->
+ <div class="container">
+ <div class="row">
+
+
+ <div id="sidebar" class="col-sm-3">
+ <!-- Top navbar. -->
+ <nav class="navbar navbar-default">
+ <!-- The logo. -->
+ <div class="navbar-header">
+ <button type="button" class="navbar-toggle collapsed"
data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
+ <span class="icon-bar"></span>
+ <span class="icon-bar"></span>
+ <span class="icon-bar"></span>
+ </button>
+ <div class="navbar-logo">
+ <a href="/">
+ <img alt="Apache Flink" src="/img/navbar-brand-logo.png"
width="147px" height="73px">
+ </a>
+ </div>
+ </div><!-- /.navbar-header -->
+
+ <!-- The navigation links. -->
+ <div class="collapse navbar-collapse"
id="bs-example-navbar-collapse-1">
+ <ul class="nav navbar-nav navbar-main">
+
+ <!-- Downloads -->
+ <li class=""><a class="btn btn-info"
href="/downloads.html">Download Flink</a></li>
+
+ <!-- Overview -->
+ <li><a href="/index.html">Home</a></li>
+
+ <!-- Intro -->
+ <li><a href="/introduction.html">Introduction to Flink</a></li>
+
+ <!-- Use cases -->
+ <li><a href="/usecases.html">Flink Use Cases</a></li>
+
+ <!-- Powered by -->
+ <li><a href="/poweredby.html">Powered by Flink</a></li>
+
+ <!-- Ecosystem -->
+ <li><a href="/ecosystem.html">Ecosystem</a></li>
+
+ <!-- Community -->
+ <li><a href="/community.html">Community & Project Info</a></li>
+
+ <!-- Contribute -->
+ <li><a href="/how-to-contribute.html">How to Contribute</a></li>
+
+ <!-- Blog -->
+ <li class=" active hidden-md hidden-sm"><a href="/blog/"><b>Flink
Blog</b></a></li>
+
+ <hr />
+
+
+
+ <!-- Documentation -->
+ <!-- <li>
+ <a
href="http://ci.apache.org/projects/flink/flink-docs-release-1.1"
target="_blank">Documentation <small><span class="glyphicon
glyphicon-new-window"></span></small></a>
+ </li> -->
+ <li class="dropdown">
+ <a class="dropdown-toggle" data-toggle="dropdown"
href="#">Documentation
+ <span class="caret"></span></a>
+ <ul class="dropdown-menu">
+ <li><a
href="http://ci.apache.org/projects/flink/flink-docs-release-1.1"
target="_blank">1.1 (Latest stable release) <small><span class="glyphicon
glyphicon-new-window"></span></small></a></li>
+ <li><a
href="http://ci.apache.org/projects/flink/flink-docs-release-1.2"
target="_blank">1.2 (Snapshot) <small><span class="glyphicon
glyphicon-new-window"></span></small></a></li>
+ </ul>
+ </li>
+
+ <!-- Quickstart -->
+ <li>
+ <a
href="http://ci.apache.org/projects/flink/flink-docs-release-1.1/quickstart/setup_quickstart.html"
target="_blank">Quickstart <small><span class="glyphicon
glyphicon-new-window"></span></small></a>
+ </li>
+
+ <!-- GitHub -->
+ <li>
+ <a href="https://github.com/apache/flink" target="_blank">Flink
on GitHub <small><span class="glyphicon
glyphicon-new-window"></span></small></a>
+ </li>
+
+
+
+
+
+
+ </ul>
+
+
+
+ <ul class="nav navbar-nav navbar-bottom">
+ <hr />
+
+ <!-- FAQ -->
+ <li ><a href="/faq.html">Project FAQ</a></li>
+
+ <!-- Twitter -->
+ <li><a href="https://twitter.com/apacheflink"
target="_blank">@ApacheFlink <small><span class="glyphicon
glyphicon-new-window"></span></small></a></li>
+
+ <!-- Visualizer -->
+ <li class=" hidden-md hidden-sm"><a href="/visualizer/"
target="_blank">Plan Visualizer <small><span class="glyphicon
glyphicon-new-window"></span></small></a></li>
+
+ </ul>
+ </div><!-- /.navbar-collapse -->
+ </nav>
+
+ </div>
+ <div class="col-sm-9">
+ <div class="row-fluid">
+ <div class="col-sm-12">
+ <div class="row">
+ <h1>Introducing Flink Streaming</h1>
+
+ <article>
+ <p>09 Feb 2015</p>
+
+<p>This post is the first of a series of blog posts on Flink Streaming,
+the recent addition to Apache Flink that makes it possible to analyze
+continuous data sources in addition to static files. Flink Streaming
+uses the pipelined Flink engine to process data streams in real time
+and offers a new API including definition of flexible windows.</p>
+
+<p>In this post, we go through an example that uses the Flink Streaming
+API to compute statistics on stock market data that arrive
+continuously and combine the stock market data with Twitter streams.
+See the <a
href="http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html">Streaming
Programming
+Guide</a> for a
+detailed presentation of the Streaming API.</p>
+
+<p>First, we read a bunch of stock price streams and combine them into
+one stream of market data. We apply several transformations on this
+market data stream, like rolling aggregations per stock. Then we emit
+price warning alerts when the prices are rapidly changing. Moving
+towards more advanced features, we compute rolling correlations
+between the market data streams and a Twitter stream with stock mentions.</p>
+
+<p>For running the example implementation please use the <em>0.9-SNAPSHOT</em>
+version of Flink as a dependency. The full example code base can be
+found <a
href="https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala">here</a>
in Scala and <a
href="https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java">here</a>
in Java7.</p>
+
+<p><a href="#top"></a></p>
+
+<p><a href="#top">Back to top</a></p>
+
+<h2 id="reading-from-multiple-inputs">Reading from multiple inputs</h2>
+
+<p>First, let us create the stream of stock prices:</p>
+
+<ol>
+ <li>Read a socket stream of stock prices</li>
+ <li>Parse the text in the stream to create a stream of
<code>StockPrice</code> objects</li>
+ <li>Add four other sources tagged with the stock symbol.</li>
+ <li>Finally, merge the streams to create a unified stream.</li>
+</ol>
+
+<p><img alt="Reading from multiple inputs"
src="/img/blog/blog_multi_input.png" width="70%" class="img-responsive
center-block" /></p>
+
+<div class="codetabs">
+ <div data-lang="scala">
+
+ <div class="highlight"><pre><code class="language-scala"
data-lang="scala"><span class="k">def</span> <span class="n">main</span><span
class="o">(</span><span class="n">args</span><span class="k">:</span> <span
class="kt">Array</span><span class="o">[</span><span
class="kt">String</span><span class="o">])</span> <span class="o">{</span>
+
+ <span class="k">val</span> <span class="n">env</span> <span
class="k">=</span> <span class="nc">StreamExecutionEnvironment</span><span
class="o">.</span><span class="n">getExecutionEnvironment</span>
+
+ <span class="c1">//Read from a socket stream at map it to StockPrice
objects</span>
+ <span class="k">val</span> <span class="n">socketStockStream</span> <span
class="k">=</span> <span class="n">env</span><span class="o">.</span><span
class="n">socketTextStream</span><span class="o">(</span><span
class="s">"localhost"</span><span class="o">,</span> <span
class="mi">9999</span><span class="o">).</span><span class="n">map</span><span
class="o">(</span><span class="n">x</span> <span class="k">=></span> <span
class="o">{</span>
+ <span class="k">val</span> <span class="n">split</span> <span
class="k">=</span> <span class="n">x</span><span class="o">.</span><span
class="n">split</span><span class="o">(</span><span
class="s">","</span><span class="o">)</span>
+ <span class="nc">StockPrice</span><span class="o">(</span><span
class="n">split</span><span class="o">(</span><span class="mi">0</span><span
class="o">),</span> <span class="n">split</span><span class="o">(</span><span
class="mi">1</span><span class="o">).</span><span
class="n">toDouble</span><span class="o">)</span>
+ <span class="o">})</span>
+
+ <span class="c1">//Generate other stock streams</span>
+ <span class="k">val</span> <span class="nc">SPX_Stream</span> <span
class="k">=</span> <span class="n">env</span><span class="o">.</span><span
class="n">addSource</span><span class="o">(</span><span
class="n">generateStock</span><span class="o">(</span><span
class="s">"SPX"</span><span class="o">)(</span><span
class="mi">10</span><span class="o">)</span> <span class="k">_</span><span
class="o">)</span>
+ <span class="k">val</span> <span class="nc">FTSE_Stream</span> <span
class="k">=</span> <span class="n">env</span><span class="o">.</span><span
class="n">addSource</span><span class="o">(</span><span
class="n">generateStock</span><span class="o">(</span><span
class="s">"FTSE"</span><span class="o">)(</span><span
class="mi">20</span><span class="o">)</span> <span class="k">_</span><span
class="o">)</span>
+ <span class="k">val</span> <span class="nc">DJI_Stream</span> <span
class="k">=</span> <span class="n">env</span><span class="o">.</span><span
class="n">addSource</span><span class="o">(</span><span
class="n">generateStock</span><span class="o">(</span><span
class="s">"DJI"</span><span class="o">)(</span><span
class="mi">30</span><span class="o">)</span> <span class="k">_</span><span
class="o">)</span>
+ <span class="k">val</span> <span class="nc">BUX_Stream</span> <span
class="k">=</span> <span class="n">env</span><span class="o">.</span><span
class="n">addSource</span><span class="o">(</span><span
class="n">generateStock</span><span class="o">(</span><span
class="s">"BUX"</span><span class="o">)(</span><span
class="mi">40</span><span class="o">)</span> <span class="k">_</span><span
class="o">)</span>
+
+ <span class="c1">//Merge all stock streams together</span>
+ <span class="k">val</span> <span class="n">stockStream</span> <span
class="k">=</span> <span class="n">socketStockStream</span><span
class="o">.</span><span class="n">merge</span><span class="o">(</span><span
class="nc">SPX_Stream</span><span class="o">,</span> <span
class="nc">FTSE_Stream</span><span class="o">,</span>
+ <span class="nc">DJI_Stream</span><span class="o">,</span> <span
class="nc">BUX_Stream</span><span class="o">)</span>
+
+ <span class="n">stockStream</span><span class="o">.</span><span
class="n">print</span><span class="o">()</span>
+
+ <span class="n">env</span><span class="o">.</span><span
class="n">execute</span><span class="o">(</span><span class="s">"Stock
stream"</span><span class="o">)</span>
+<span class="o">}</span></code></pre></div>
+
+ </div>
+ <div data-lang="java7">
+
+ <div class="highlight"><pre><code class="language-java"
data-lang="java"><span class="kd">public</span> <span class="kd">static</span>
<span class="kt">void</span> <span class="nf">main</span><span
class="o">(</span><span class="n">String</span><span class="o">[]</span> <span
class="n">args</span><span class="o">)</span> <span class="kd">throws</span>
<span class="n">Exception</span> <span class="o">{</span>
+
+ <span class="kd">final</span> <span
class="n">StreamExecutionEnvironment</span> <span class="n">env</span> <span
class="o">=</span>
+ <span class="n">StreamExecutionEnvironment</span><span
class="o">.</span><span class="na">getExecutionEnvironment</span><span
class="o">();</span>
+
+ <span class="c1">//Read from a socket stream at map it to StockPrice
objects</span>
+ <span class="n">DataStream</span><span class="o"><</span><span
class="n">StockPrice</span><span class="o">></span> <span
class="n">socketStockStream</span> <span class="o">=</span> <span
class="n">env</span>
+ <span class="o">.</span><span
class="na">socketTextStream</span><span class="o">(</span><span
class="s">"localhost"</span><span class="o">,</span> <span
class="mi">9999</span><span class="o">)</span>
+ <span class="o">.</span><span class="na">map</span><span
class="o">(</span><span class="k">new</span> <span
class="n">MapFunction</span><span class="o"><</span><span
class="n">String</span><span class="o">,</span> <span
class="n">StockPrice</span><span class="o">>()</span> <span
class="o">{</span>
+ <span class="kd">private</span> <span
class="n">String</span><span class="o">[]</span> <span
class="n">tokens</span><span class="o">;</span>
+
+ <span class="nd">@Override</span>
+ <span class="kd">public</span> <span
class="n">StockPrice</span> <span class="nf">map</span><span
class="o">(</span><span class="n">String</span> <span
class="n">value</span><span class="o">)</span> <span class="kd">throws</span>
<span class="n">Exception</span> <span class="o">{</span>
+ <span class="n">tokens</span> <span class="o">=</span>
<span class="n">value</span><span class="o">.</span><span
class="na">split</span><span class="o">(</span><span
class="s">","</span><span class="o">);</span>
+ <span class="k">return</span> <span class="k">new</span>
<span class="nf">StockPrice</span><span class="o">(</span><span
class="n">tokens</span><span class="o">[</span><span class="mi">0</span><span
class="o">],</span>
+ <span class="n">Double</span><span
class="o">.</span><span class="na">parseDouble</span><span
class="o">(</span><span class="n">tokens</span><span class="o">[</span><span
class="mi">1</span><span class="o">]));</span>
+ <span class="o">}</span>
+ <span class="o">});</span>
+
+ <span class="c1">//Generate other stock streams</span>
+ <span class="n">DataStream</span><span class="o"><</span><span
class="n">StockPrice</span><span class="o">></span> <span
class="n">SPX_stream</span> <span class="o">=</span> <span
class="n">env</span><span class="o">.</span><span
class="na">addSource</span><span class="o">(</span><span class="k">new</span>
<span class="nf">StockSource</span><span class="o">(</span><span
class="s">"SPX"</span><span class="o">,</span> <span
class="mi">10</span><span class="o">));</span>
+ <span class="n">DataStream</span><span class="o"><</span><span
class="n">StockPrice</span><span class="o">></span> <span
class="n">FTSE_stream</span> <span class="o">=</span> <span
class="n">env</span><span class="o">.</span><span
class="na">addSource</span><span class="o">(</span><span class="k">new</span>
<span class="nf">StockSource</span><span class="o">(</span><span
class="s">"FTSE"</span><span class="o">,</span> <span
class="mi">20</span><span class="o">));</span>
+ <span class="n">DataStream</span><span class="o"><</span><span
class="n">StockPrice</span><span class="o">></span> <span
class="n">DJI_stream</span> <span class="o">=</span> <span
class="n">env</span><span class="o">.</span><span
class="na">addSource</span><span class="o">(</span><span class="k">new</span>
<span class="nf">StockSource</span><span class="o">(</span><span
class="s">"DJI"</span><span class="o">,</span> <span
class="mi">30</span><span class="o">));</span>
+ <span class="n">DataStream</span><span class="o"><</span><span
class="n">StockPrice</span><span class="o">></span> <span
class="n">BUX_stream</span> <span class="o">=</span> <span
class="n">env</span><span class="o">.</span><span
class="na">addSource</span><span class="o">(</span><span class="k">new</span>
<span class="nf">StockSource</span><span class="o">(</span><span
class="s">"BUX"</span><span class="o">,</span> <span
class="mi">40</span><span class="o">));</span>
+
+ <span class="c1">//Merge all stock streams together</span>
+ <span class="n">DataStream</span><span class="o"><</span><span
class="n">StockPrice</span><span class="o">></span> <span
class="n">stockStream</span> <span class="o">=</span> <span
class="n">socketStockStream</span>
+ <span class="o">.</span><span class="na">merge</span><span
class="o">(</span><span class="n">SPX_stream</span><span class="o">,</span>
<span class="n">FTSE_stream</span><span class="o">,</span> <span
class="n">DJI_stream</span><span class="o">,</span> <span
class="n">BUX_stream</span><span class="o">);</span>
+
+ <span class="n">stockStream</span><span class="o">.</span><span
class="na">print</span><span class="o">();</span>
+
+ <span class="n">env</span><span class="o">.</span><span
class="na">execute</span><span class="o">(</span><span class="s">"Stock
stream"</span><span class="o">);</span></code></pre></div>
+
+ </div>
+</div>
+
+<p>See
+<a
href="http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html#data-sources">here</a>
+on how you can create streaming sources for Flink Streaming
+programs. Flink, of course, has support for reading in streams from
+<a
href="http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/index.html">external
+sources</a>
+such as Apache Kafka, Apache Flume, RabbitMQ, and others. For the sake
+of this example, the data streams are simply generated using the
+<code>generateStock</code> method:</p>
+
+<div class="codetabs">
+ <div data-lang="scala">
+
+ <div class="highlight"><pre><code class="language-scala"
data-lang="scala"><span class="k">val</span> <span class="n">symbols</span>
<span class="k">=</span> <span class="nc">List</span><span
class="o">(</span><span class="s">"SPX"</span><span
class="o">,</span> <span class="s">"FTSE"</span><span
class="o">,</span> <span class="s">"DJI"</span><span
class="o">,</span> <span class="s">"DJT"</span><span
class="o">,</span> <span class="s">"BUX"</span><span
class="o">,</span> <span class="s">"DAX"</span><span
class="o">,</span> <span class="s">"GOOG"</span><span
class="o">)</span>
+
+<span class="k">case</span> <span class="k">class</span> <span
class="nc">StockPrice</span><span class="o">(</span><span
class="n">symbol</span><span class="k">:</span> <span
class="kt">String</span><span class="o">,</span> <span
class="n">price</span><span class="k">:</span> <span
class="kt">Double</span><span class="o">)</span>
+
+<span class="k">def</span> <span class="n">generateStock</span><span
class="o">(</span><span class="n">symbol</span><span class="k">:</span> <span
class="kt">String</span><span class="o">)(</span><span
class="n">sigma</span><span class="k">:</span> <span class="kt">Int</span><span
class="o">)(</span><span class="n">out</span><span class="k">:</span> <span
class="kt">Collector</span><span class="o">[</span><span
class="kt">StockPrice</span><span class="o">])</span> <span class="k">=</span>
<span class="o">{</span>
+ <span class="k">var</span> <span class="n">price</span> <span
class="k">=</span> <span class="mf">1000.</span>
+ <span class="k">while</span> <span class="o">(</span><span
class="kc">true</span><span class="o">)</span> <span class="o">{</span>
+ <span class="n">price</span> <span class="k">=</span> <span
class="n">price</span> <span class="o">+</span> <span
class="nc">Random</span><span class="o">.</span><span
class="n">nextGaussian</span> <span class="o">*</span> <span
class="n">sigma</span>
+ <span class="n">out</span><span class="o">.</span><span
class="n">collect</span><span class="o">(</span><span
class="nc">StockPrice</span><span class="o">(</span><span
class="n">symbol</span><span class="o">,</span> <span
class="n">price</span><span class="o">))</span>
+ <span class="nc">Thread</span><span class="o">.</span><span
class="n">sleep</span><span class="o">(</span><span
class="nc">Random</span><span class="o">.</span><span
class="n">nextInt</span><span class="o">(</span><span
class="mi">200</span><span class="o">))</span>
+ <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+ </div>
+ <div data-lang="java7">
+
+ <div class="highlight"><pre><code class="language-java"
data-lang="java"><span class="kd">private</span> <span class="kd">static</span>
<span class="kd">final</span> <span class="n">ArrayList</span><span
class="o"><</span><span class="n">String</span><span class="o">></span>
<span class="n">SYMBOLS</span> <span class="o">=</span> <span
class="k">new</span> <span class="n">ArrayList</span><span
class="o"><</span><span class="n">String</span><span class="o">>(</span>
+ <span class="n">Arrays</span><span class="o">.</span><span
class="na">asList</span><span class="o">(</span><span
class="s">"SPX"</span><span class="o">,</span> <span
class="s">"FTSE"</span><span class="o">,</span> <span
class="s">"DJI"</span><span class="o">,</span> <span
class="s">"DJT"</span><span class="o">,</span> <span
class="s">"BUX"</span><span class="o">,</span> <span
class="s">"DAX"</span><span class="o">,</span> <span
class="s">"GOOG"</span><span class="o">));</span>
+
+<span class="kd">public</span> <span class="kd">static</span> <span
class="kd">class</span> <span class="nc">StockPrice</span> <span
class="kd">implements</span> <span class="n">Serializable</span> <span
class="o">{</span>
+
+ <span class="kd">public</span> <span class="n">String</span> <span
class="n">symbol</span><span class="o">;</span>
+ <span class="kd">public</span> <span class="n">Double</span> <span
class="n">price</span><span class="o">;</span>
+
+ <span class="kd">public</span> <span class="nf">StockPrice</span><span
class="o">()</span> <span class="o">{</span>
+ <span class="o">}</span>
+
+ <span class="kd">public</span> <span class="nf">StockPrice</span><span
class="o">(</span><span class="n">String</span> <span
class="n">symbol</span><span class="o">,</span> <span class="n">Double</span>
<span class="n">price</span><span class="o">)</span> <span class="o">{</span>
+ <span class="k">this</span><span class="o">.</span><span
class="na">symbol</span> <span class="o">=</span> <span
class="n">symbol</span><span class="o">;</span>
+ <span class="k">this</span><span class="o">.</span><span
class="na">price</span> <span class="o">=</span> <span
class="n">price</span><span class="o">;</span>
+ <span class="o">}</span>
+
+ <span class="nd">@Override</span>
+ <span class="kd">public</span> <span class="n">String</span> <span
class="nf">toString</span><span class="o">()</span> <span class="o">{</span>
+ <span class="k">return</span> <span
class="s">"StockPrice{"</span> <span class="o">+</span>
+ <span class="s">"symbol='"</span> <span
class="o">+</span> <span class="n">symbol</span> <span class="o">+</span> <span
class="sc">'\''</span> <span class="o">+</span>
+ <span class="s">", count="</span> <span
class="o">+</span> <span class="n">price</span> <span class="o">+</span>
+ <span class="sc">'}'</span><span class="o">;</span>
+ <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="kd">public</span> <span class="kd">final</span> <span
class="kd">static</span> <span class="kd">class</span> <span
class="nc">StockSource</span> <span class="kd">implements</span> <span
class="n">SourceFunction</span><span class="o"><</span><span
class="n">StockPrice</span><span class="o">></span> <span class="o">{</span>
+
+ <span class="kd">private</span> <span class="n">Double</span> <span
class="n">price</span><span class="o">;</span>
+ <span class="kd">private</span> <span class="n">String</span> <span
class="n">symbol</span><span class="o">;</span>
+ <span class="kd">private</span> <span class="n">Integer</span> <span
class="n">sigma</span><span class="o">;</span>
+
+ <span class="kd">public</span> <span class="nf">StockSource</span><span
class="o">(</span><span class="n">String</span> <span
class="n">symbol</span><span class="o">,</span> <span class="n">Integer</span>
<span class="n">sigma</span><span class="o">)</span> <span class="o">{</span>
+ <span class="k">this</span><span class="o">.</span><span
class="na">symbol</span> <span class="o">=</span> <span
class="n">symbol</span><span class="o">;</span>
+ <span class="k">this</span><span class="o">.</span><span
class="na">sigma</span> <span class="o">=</span> <span
class="n">sigma</span><span class="o">;</span>
+ <span class="o">}</span>
+
+ <span class="nd">@Override</span>
+ <span class="kd">public</span> <span class="kt">void</span> <span
class="nf">invoke</span><span class="o">(</span><span
class="n">Collector</span><span class="o"><</span><span
class="n">StockPrice</span><span class="o">></span> <span
class="n">collector</span><span class="o">)</span> <span
class="kd">throws</span> <span class="n">Exception</span> <span
class="o">{</span>
+ <span class="n">price</span> <span class="o">=</span> <span
class="n">DEFAULT_PRICE</span><span class="o">;</span>
+ <span class="n">Random</span> <span class="n">random</span> <span
class="o">=</span> <span class="k">new</span> <span
class="nf">Random</span><span class="o">();</span>
+
+ <span class="k">while</span> <span class="o">(</span><span
class="kc">true</span><span class="o">)</span> <span class="o">{</span>
+ <span class="n">price</span> <span class="o">=</span> <span
class="n">price</span> <span class="o">+</span> <span
class="n">random</span><span class="o">.</span><span
class="na">nextGaussian</span><span class="o">()</span> <span
class="o">*</span> <span class="n">sigma</span><span class="o">;</span>
+ <span class="n">collector</span><span class="o">.</span><span
class="na">collect</span><span class="o">(</span><span class="k">new</span>
<span class="nf">StockPrice</span><span class="o">(</span><span
class="n">symbol</span><span class="o">,</span> <span
class="n">price</span><span class="o">));</span>
+ <span class="n">Thread</span><span class="o">.</span><span
class="na">sleep</span><span class="o">(</span><span
class="n">random</span><span class="o">.</span><span
class="na">nextInt</span><span class="o">(</span><span
class="mi">200</span><span class="o">));</span>
+ <span class="o">}</span>
+ <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+ </div>
+</div>
+
+<p>To read from the text socket stream please make sure that you have a
+socket running. For the sake of the example executing the following
+command in a terminal does the job. You can get
+<a href="http://netcat.sourceforge.net/">netcat</a> here if it is not available
+on your machine.</p>
+
+<div class="highlight"><pre><code>nc -lk 9999
+</code></pre></div>
+
+<p>If we execute the program from our IDE we see the system the
+stock prices being generated:</p>
+
+<div class="highlight"><pre><code>INFO Job execution switched to status
RUNNING.
+INFO Socket Stream(1/1) switched to SCHEDULED
+INFO Socket Stream(1/1) switched to DEPLOYING
+INFO Custom Source(1/1) switched to SCHEDULED
+INFO Custom Source(1/1) switched to DEPLOYING
+â¦
+1> StockPrice{symbol='SPX', count=1011.3405732645239}
+2> StockPrice{symbol='SPX', count=1018.3381290039248}
+1> StockPrice{symbol='DJI', count=1036.7454894073978}
+3> StockPrice{symbol='DJI', count=1135.1170217478427}
+3> StockPrice{symbol='BUX', count=1053.667523187687}
+4> StockPrice{symbol='BUX', count=1036.552601487263}
+</code></pre></div>
+
+<p><a href="#top">Back to top</a></p>
+
+<h2 id="window-aggregations">Window aggregations</h2>
+
+<p>We first compute aggregations on time-based windows of the
+data. Flink provides <a
href="http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html">flexible
windowing semantics</a> where windows can
+also be defined based on count of records or any custom user defined
+logic.</p>
+
+<p>We partition our stream into windows of 10 seconds and slide the
+window every 5 seconds. We compute three statistics every 5 seconds.
+The first is the minimum price of all stocks, the second produces
+maximum price per stock, and the third is the mean stock price
+(using a map window function). Aggregations and groupings can be
+performed on named fields of POJOs, making the code more readable.</p>
+
+<p><img alt="Basic windowing aggregations"
src="/img/blog/blog_basic_window.png" width="70%" class="img-responsive
center-block" /></p>
+
+<div class="codetabs">
+
+ <div data-lang="scala">
+
+ <div class="highlight"><pre><code class="language-scala"
data-lang="scala"><span class="c1">//Define the desired time window</span>
+<span class="k">val</span> <span class="n">windowedStream</span> <span
class="k">=</span> <span class="n">stockStream</span>
+ <span class="o">.</span><span class="n">window</span><span
class="o">(</span><span class="nc">Time</span><span class="o">.</span><span
class="n">of</span><span class="o">(</span><span class="mi">10</span><span
class="o">,</span> <span class="nc">SECONDS</span><span
class="o">)).</span><span class="n">every</span><span class="o">(</span><span
class="nc">Time</span><span class="o">.</span><span class="n">of</span><span
class="o">(</span><span class="mi">5</span><span class="o">,</span> <span
class="nc">SECONDS</span><span class="o">))</span>
+
+<span class="c1">//Compute some simple statistics on a rolling window</span>
+<span class="k">val</span> <span class="n">lowest</span> <span
class="k">=</span> <span class="n">windowedStream</span><span
class="o">.</span><span class="n">minBy</span><span class="o">(</span><span
class="s">"price"</span><span class="o">)</span>
+<span class="k">val</span> <span class="n">maxByStock</span> <span
class="k">=</span> <span class="n">windowedStream</span><span
class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span
class="s">"symbol"</span><span class="o">).</span><span
class="n">maxBy</span><span class="o">(</span><span
class="s">"price"</span><span class="o">)</span>
+<span class="k">val</span> <span class="n">rollingMean</span> <span
class="k">=</span> <span class="n">windowedStream</span><span
class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span
class="s">"symbol"</span><span class="o">).</span><span
class="n">mapWindow</span><span class="o">(</span><span class="n">mean</span>
<span class="k">_</span><span class="o">)</span>
+
+<span class="c1">//Compute the mean of a window</span>
+<span class="k">def</span> <span class="n">mean</span><span
class="o">(</span><span class="n">ts</span><span class="k">:</span> <span
class="kt">Iterable</span><span class="o">[</span><span
class="kt">StockPrice</span><span class="o">],</span> <span
class="n">out</span><span class="k">:</span> <span
class="kt">Collector</span><span class="o">[</span><span
class="kt">StockPrice</span><span class="o">])</span> <span class="k">=</span>
<span class="o">{</span>
+ <span class="k">if</span> <span class="o">(</span><span
class="n">ts</span><span class="o">.</span><span class="n">nonEmpty</span><span
class="o">)</span> <span class="o">{</span>
+ <span class="n">out</span><span class="o">.</span><span
class="n">collect</span><span class="o">(</span><span
class="nc">StockPrice</span><span class="o">(</span><span
class="n">ts</span><span class="o">.</span><span class="n">head</span><span
class="o">.</span><span class="n">symbol</span><span class="o">,</span> <span
class="n">ts</span><span class="o">.</span><span class="n">foldLeft</span><span
class="o">(</span><span class="mi">0</span><span class="k">:</span> <span
class="kt">Double</span><span class="o">)(</span><span class="k">_</span> <span
class="o">+</span> <span class="k">_</span><span class="o">.</span><span
class="n">price</span><span class="o">)</span> <span class="o">/</span> <span
class="n">ts</span><span class="o">.</span><span class="n">size</span><span
class="o">))</span>
+ <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+ </div>
+
+ <div data-lang="java7">
+
+ <div class="highlight"><pre><code class="language-java"
data-lang="java"><span class="c1">//Define the desired time window</span>
+<span class="n">WindowedDataStream</span><span class="o"><</span><span
class="n">StockPrice</span><span class="o">></span> <span
class="n">windowedStream</span> <span class="o">=</span> <span
class="n">stockStream</span>
+ <span class="o">.</span><span class="na">window</span><span
class="o">(</span><span class="n">Time</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span class="mi">10</span><span
class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span
class="na">SECONDS</span><span class="o">))</span>
+ <span class="o">.</span><span class="na">every</span><span
class="o">(</span><span class="n">Time</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span class="mi">5</span><span
class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span
class="na">SECONDS</span><span class="o">));</span>
+
+<span class="c1">//Compute some simple statistics on a rolling window</span>
+<span class="n">DataStream</span><span class="o"><</span><span
class="n">StockPrice</span><span class="o">></span> <span
class="n">lowest</span> <span class="o">=</span> <span
class="n">windowedStream</span><span class="o">.</span><span
class="na">minBy</span><span class="o">(</span><span
class="s">"price"</span><span class="o">).</span><span
class="na">flatten</span><span class="o">();</span>
+<span class="n">DataStream</span><span class="o"><</span><span
class="n">StockPrice</span><span class="o">></span> <span
class="n">maxByStock</span> <span class="o">=</span> <span
class="n">windowedStream</span><span class="o">.</span><span
class="na">groupBy</span><span class="o">(</span><span
class="s">"symbol"</span><span class="o">)</span>
+ <span class="o">.</span><span class="na">maxBy</span><span
class="o">(</span><span class="s">"price"</span><span
class="o">).</span><span class="na">flatten</span><span class="o">();</span>
+<span class="n">DataStream</span><span class="o"><</span><span
class="n">StockPrice</span><span class="o">></span> <span
class="n">rollingMean</span> <span class="o">=</span> <span
class="n">windowedStream</span><span class="o">.</span><span
class="na">groupBy</span><span class="o">(</span><span
class="s">"symbol"</span><span class="o">)</span>
+ <span class="o">.</span><span class="na">mapWindow</span><span
class="o">(</span><span class="k">new</span> <span
class="nf">WindowMean</span><span class="o">()).</span><span
class="na">flatten</span><span class="o">();</span>
+
+<span class="c1">//Compute the mean of a window</span>
+<span class="kd">public</span> <span class="kd">final</span> <span
class="kd">static</span> <span class="kd">class</span> <span
class="nc">WindowMean</span> <span class="kd">implements</span>
+ <span class="n">WindowMapFunction</span><span class="o"><</span><span
class="n">StockPrice</span><span class="o">,</span> <span
class="n">StockPrice</span><span class="o">></span> <span class="o">{</span>
+
+ <span class="kd">private</span> <span class="n">Double</span> <span
class="n">sum</span> <span class="o">=</span> <span class="mf">0.0</span><span
class="o">;</span>
+ <span class="kd">private</span> <span class="n">Integer</span> <span
class="n">count</span> <span class="o">=</span> <span class="mi">0</span><span
class="o">;</span>
+ <span class="kd">private</span> <span class="n">String</span> <span
class="n">symbol</span> <span class="o">=</span> <span
class="s">""</span><span class="o">;</span>
+
+ <span class="nd">@Override</span>
+ <span class="kd">public</span> <span class="kt">void</span> <span
class="nf">mapWindow</span><span class="o">(</span><span
class="n">Iterable</span><span class="o"><</span><span
class="n">StockPrice</span><span class="o">></span> <span
class="n">values</span><span class="o">,</span> <span
class="n">Collector</span><span class="o"><</span><span
class="n">StockPrice</span><span class="o">></span> <span
class="n">out</span><span class="o">)</span>
+ <span class="kd">throws</span> <span class="n">Exception</span> <span
class="o">{</span>
+
+ <span class="k">if</span> <span class="o">(</span><span
class="n">values</span><span class="o">.</span><span
class="na">iterator</span><span class="o">().</span><span
class="na">hasNext</span><span class="o">())</span> <span
class="o">{</span><span class="n">s</span>
+ <span class="nf">for</span> <span class="o">(</span><span
class="n">StockPrice</span> <span class="n">sp</span> <span class="o">:</span>
<span class="n">values</span><span class="o">)</span> <span class="o">{</span>
+ <span class="n">sum</span> <span class="o">+=</span> <span
class="n">sp</span><span class="o">.</span><span class="na">price</span><span
class="o">;</span>
+ <span class="n">symbol</span> <span class="o">=</span> <span
class="n">sp</span><span class="o">.</span><span class="na">symbol</span><span
class="o">;</span>
+ <span class="n">count</span><span class="o">++;</span>
+ <span class="o">}</span>
+ <span class="n">out</span><span class="o">.</span><span
class="na">collect</span><span class="o">(</span><span class="k">new</span>
<span class="nf">StockPrice</span><span class="o">(</span><span
class="n">symbol</span><span class="o">,</span> <span class="n">sum</span>
<span class="o">/</span> <span class="n">count</span><span class="o">));</span>
+ <span class="o">}</span>
+ <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+ </div>
+
+</div>
+
+<p>Let us note that to print a windowed stream one has to flatten it first,
+thus getting rid of the windowing logic. For example execute
+<code>maxByStock.flatten().print()</code> to print the stream of maximum
prices of
+ the time windows by stock. For Scala <code>flatten()</code> is called
implicitly
+when needed.</p>
+
+<p><a href="#top">Back to top</a></p>
+
+<h2 id="data-driven-windows">Data-driven windows</h2>
+
+<p>The most interesting event in the stream is when the price of a stock
+is changing rapidly. We can send a warning when a stock price changes
+more than 5% since the last warning. To do that, we use a delta-based window
providing a
+threshold on when the computation will be triggered, a function to
+compute the difference and a default value with which the first record
+is compared. We also create a <code>Count</code> data type to count the
warnings
+every 30 seconds.</p>
+
+<p><img alt="Data-driven windowing semantics"
src="/img/blog/blog_data_driven.png" width="100%" class="img-responsive
center-block" /></p>
+
+<div class="codetabs">
+
+ <div data-lang="scala">
+
+ <div class="highlight"><pre><code class="language-scala"
data-lang="scala"><span class="k">case</span> <span class="k">class</span>
<span class="nc">Count</span><span class="o">(</span><span
class="n">symbol</span><span class="k">:</span> <span
class="kt">String</span><span class="o">,</span> <span
class="n">count</span><span class="k">:</span> <span class="kt">Int</span><span
class="o">)</span>
+<span class="k">val</span> <span class="n">defaultPrice</span> <span
class="k">=</span> <span class="nc">StockPrice</span><span
class="o">(</span><span class="s">""</span><span class="o">,</span>
<span class="mi">1000</span><span class="o">)</span>
+
+<span class="c1">//Use delta policy to create price change warnings</span>
+<span class="k">val</span> <span class="n">priceWarnings</span> <span
class="k">=</span> <span class="n">stockStream</span><span
class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span
class="s">"symbol"</span><span class="o">)</span>
+ <span class="o">.</span><span class="n">window</span><span
class="o">(</span><span class="nc">Delta</span><span class="o">.</span><span
class="n">of</span><span class="o">(</span><span class="mf">0.05</span><span
class="o">,</span> <span class="n">priceChange</span><span class="o">,</span>
<span class="n">defaultPrice</span><span class="o">))</span>
+ <span class="o">.</span><span class="n">mapWindow</span><span
class="o">(</span><span class="n">sendWarning</span> <span
class="k">_</span><span class="o">)</span>
+
+<span class="c1">//Count the number of warnings every half a minute</span>
+<span class="k">val</span> <span class="n">warningsPerStock</span> <span
class="k">=</span> <span class="n">priceWarnings</span><span
class="o">.</span><span class="n">map</span><span class="o">(</span><span
class="nc">Count</span><span class="o">(</span><span class="k">_</span><span
class="o">,</span> <span class="mi">1</span><span class="o">))</span>
+ <span class="o">.</span><span class="n">groupBy</span><span
class="o">(</span><span class="s">"symbol"</span><span
class="o">)</span>
+ <span class="o">.</span><span class="n">window</span><span
class="o">(</span><span class="nc">Time</span><span class="o">.</span><span
class="n">of</span><span class="o">(</span><span class="mi">30</span><span
class="o">,</span> <span class="nc">SECONDS</span><span class="o">))</span>
+ <span class="o">.</span><span class="n">sum</span><span
class="o">(</span><span class="s">"count"</span><span
class="o">)</span>
+
+<span class="k">def</span> <span class="n">priceChange</span><span
class="o">(</span><span class="n">p1</span><span class="k">:</span> <span
class="kt">StockPrice</span><span class="o">,</span> <span
class="n">p2</span><span class="k">:</span> <span
class="kt">StockPrice</span><span class="o">)</span><span class="k">:</span>
<span class="kt">Double</span> <span class="o">=</span> <span class="o">{</span>
+ <span class="nc">Math</span><span class="o">.</span><span
class="n">abs</span><span class="o">(</span><span class="n">p1</span><span
class="o">.</span><span class="n">price</span> <span class="o">/</span> <span
class="n">p2</span><span class="o">.</span><span class="n">price</span> <span
class="o">-</span> <span class="mi">1</span><span class="o">)</span>
+<span class="o">}</span>
+
+<span class="k">def</span> <span class="n">sendWarning</span><span
class="o">(</span><span class="n">ts</span><span class="k">:</span> <span
class="kt">Iterable</span><span class="o">[</span><span
class="kt">StockPrice</span><span class="o">],</span> <span
class="n">out</span><span class="k">:</span> <span
class="kt">Collector</span><span class="o">[</span><span
class="kt">String</span><span class="o">])</span> <span class="k">=</span>
<span class="o">{</span>
+ <span class="k">if</span> <span class="o">(</span><span
class="n">ts</span><span class="o">.</span><span class="n">nonEmpty</span><span
class="o">)</span> <span class="n">out</span><span class="o">.</span><span
class="n">collect</span><span class="o">(</span><span class="n">ts</span><span
class="o">.</span><span class="n">head</span><span class="o">.</span><span
class="n">symbol</span><span class="o">)</span>
+<span class="o">}</span></code></pre></div>
+
+ </div>
+
+ <div data-lang="java7">
+
+ <div class="highlight"><pre><code class="language-java"
data-lang="java"><span class="kd">private</span> <span class="kd">static</span>
<span class="kd">final</span> <span class="n">Double</span> <span
class="n">DEFAULT_PRICE</span> <span class="o">=</span> <span
class="mi">1000</span><span class="o">.;</span>
+<span class="kd">private</span> <span class="kd">static</span> <span
class="kd">final</span> <span class="n">StockPrice</span> <span
class="n">DEFAULT_STOCK_PRICE</span> <span class="o">=</span> <span
class="k">new</span> <span class="nf">StockPrice</span><span
class="o">(</span><span class="s">""</span><span class="o">,</span>
<span class="n">DEFAULT_PRICE</span><span class="o">);</span>
+
+<span class="c1">//Use delta policy to create price change warnings</span>
+<span class="n">DataStream</span><span class="o"><</span><span
class="n">String</span><span class="o">></span> <span
class="n">priceWarnings</span> <span class="o">=</span> <span
class="n">stockStream</span><span class="o">.</span><span
class="na">groupBy</span><span class="o">(</span><span
class="s">"symbol"</span><span class="o">)</span>
+ <span class="o">.</span><span class="na">window</span><span
class="o">(</span><span class="n">Delta</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span class="mf">0.05</span><span
class="o">,</span> <span class="k">new</span> <span
class="n">DeltaFunction</span><span class="o"><</span><span
class="n">StockPrice</span><span class="o">>()</span> <span
class="o">{</span>
+ <span class="nd">@Override</span>
+ <span class="kd">public</span> <span class="kt">double</span> <span
class="nf">getDelta</span><span class="o">(</span><span
class="n">StockPrice</span> <span class="n">oldDataPoint</span><span
class="o">,</span> <span class="n">StockPrice</span> <span
class="n">newDataPoint</span><span class="o">)</span> <span class="o">{</span>
+ <span class="k">return</span> <span class="n">Math</span><span
class="o">.</span><span class="na">abs</span><span class="o">(</span><span
class="n">oldDataPoint</span><span class="o">.</span><span
class="na">price</span> <span class="o">-</span> <span
class="n">newDataPoint</span><span class="o">.</span><span
class="na">price</span><span class="o">);</span>
+ <span class="o">}</span>
+ <span class="o">},</span> <span class="n">DEFAULT_STOCK_PRICE</span><span
class="o">))</span>
+<span class="o">.</span><span class="na">mapWindow</span><span
class="o">(</span><span class="k">new</span> <span
class="nf">SendWarning</span><span class="o">()).</span><span
class="na">flatten</span><span class="o">();</span>
+
+<span class="c1">//Count the number of warnings every half a minute</span>
+<span class="n">DataStream</span><span class="o"><</span><span
class="n">Count</span><span class="o">></span> <span
class="n">warningsPerStock</span> <span class="o">=</span> <span
class="n">priceWarnings</span><span class="o">.</span><span
class="na">map</span><span class="o">(</span><span class="k">new</span> <span
class="n">MapFunction</span><span class="o"><</span><span
class="n">String</span><span class="o">,</span> <span
class="n">Count</span><span class="o">>()</span> <span class="o">{</span>
+ <span class="nd">@Override</span>
+ <span class="kd">public</span> <span class="n">Count</span> <span
class="nf">map</span><span class="o">(</span><span class="n">String</span>
<span class="n">value</span><span class="o">)</span> <span
class="kd">throws</span> <span class="n">Exception</span> <span
class="o">{</span>
+ <span class="k">return</span> <span class="k">new</span> <span
class="nf">Count</span><span class="o">(</span><span
class="n">value</span><span class="o">,</span> <span class="mi">1</span><span
class="o">);</span>
+ <span class="o">}</span>
+<span class="o">}).</span><span class="na">groupBy</span><span
class="o">(</span><span class="s">"symbol"</span><span
class="o">).</span><span class="na">window</span><span class="o">(</span><span
class="n">Time</span><span class="o">.</span><span class="na">of</span><span
class="o">(</span><span class="mi">30</span><span class="o">,</span> <span
class="n">TimeUnit</span><span class="o">.</span><span
class="na">SECONDS</span><span class="o">)).</span><span
class="na">sum</span><span class="o">(</span><span
class="s">"count"</span><span class="o">).</span><span
class="na">flatten</span><span class="o">();</span>
+
+<span class="kd">public</span> <span class="kd">static</span> <span
class="kd">class</span> <span class="nc">Count</span> <span
class="kd">implements</span> <span class="n">Serializable</span> <span
class="o">{</span>
+ <span class="kd">public</span> <span class="n">String</span> <span
class="n">symbol</span><span class="o">;</span>
+ <span class="kd">public</span> <span class="n">Integer</span> <span
class="n">count</span><span class="o">;</span>
+
+ <span class="kd">public</span> <span class="nf">Count</span><span
class="o">()</span> <span class="o">{</span>
+ <span class="o">}</span>
+
+ <span class="kd">public</span> <span class="nf">Count</span><span
class="o">(</span><span class="n">String</span> <span
class="n">symbol</span><span class="o">,</span> <span class="n">Integer</span>
<span class="n">count</span><span class="o">)</span> <span class="o">{</span>
+ <span class="k">this</span><span class="o">.</span><span
class="na">symbol</span> <span class="o">=</span> <span
class="n">symbol</span><span class="o">;</span>
+ <span class="k">this</span><span class="o">.</span><span
class="na">count</span> <span class="o">=</span> <span
class="n">count</span><span class="o">;</span>
+ <span class="o">}</span>
+
+ <span class="nd">@Override</span>
+ <span class="kd">public</span> <span class="n">String</span> <span
class="nf">toString</span><span class="o">()</span> <span class="o">{</span>
+ <span class="k">return</span> <span
class="s">"Count{"</span> <span class="o">+</span>
+ <span class="s">"symbol='"</span> <span
class="o">+</span> <span class="n">symbol</span> <span class="o">+</span> <span
class="sc">'\''</span> <span class="o">+</span>
+ <span class="s">", count="</span> <span
class="o">+</span> <span class="n">count</span> <span class="o">+</span>
+ <span class="sc">'}'</span><span class="o">;</span>
+ <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="kd">public</span> <span class="kd">static</span> <span
class="kd">final</span> <span class="kd">class</span> <span
class="nc">SendWarning</span> <span class="kd">implements</span> <span
class="n">MapWindowFunction</span><span class="o"><</span><span
class="n">StockPrice</span><span class="o">,</span> <span
class="n">String</span><span class="o">></span> <span class="o">{</span>
+ <span class="nd">@Override</span>
+ <span class="kd">public</span> <span class="kt">void</span> <span
class="nf">mapWindow</span><span class="o">(</span><span
class="n">Iterable</span><span class="o"><</span><span
class="n">StockPrice</span><span class="o">></span> <span
class="n">values</span><span class="o">,</span> <span
class="n">Collector</span><span class="o"><</span><span
class="n">String</span><span class="o">></span> <span
class="n">out</span><span class="o">)</span>
+ <span class="kd">throws</span> <span class="n">Exception</span> <span
class="o">{</span>
+
+ <span class="k">if</span> <span class="o">(</span><span
class="n">values</span><span class="o">.</span><span
class="na">iterator</span><span class="o">().</span><span
class="na">hasNext</span><span class="o">())</span> <span class="o">{</span>
+ <span class="n">out</span><span class="o">.</span><span
class="na">collect</span><span class="o">(</span><span
class="n">values</span><span class="o">.</span><span
class="na">iterator</span><span class="o">().</span><span
class="na">next</span><span class="o">().</span><span
class="na">symbol</span><span class="o">);</span>
+ <span class="o">}</span>
+ <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+ </div>
+
+</div>
+
+<p><a href="#top">Back to top</a></p>
+
+<h2 id="combining-with-a-twitter-stream">Combining with a Twitter stream</h2>
+
+<p>Next, we will read a Twitter stream and correlate it with our stock
+price stream. Flink has support for connecting to <a
href="https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/twitter.html">Twitterâs
+API</a>
+but for the sake of this example we generate dummy tweet data.</p>
+
+<p><img alt="Social media analytics" src="/img/blog/blog_social_media.png"
width="100%" class="img-responsive center-block" /></p>
+
+<div class="codetabs">
+
+ <div data-lang="scala">
+
+ <div class="highlight"><pre><code class="language-scala"
data-lang="scala"><span class="c1">//Read a stream of tweets</span>
+<span class="k">val</span> <span class="n">tweetStream</span> <span
class="k">=</span> <span class="n">env</span><span class="o">.</span><span
class="n">addSource</span><span class="o">(</span><span
class="n">generateTweets</span> <span class="k">_</span><span class="o">)</span>
+
+<span class="c1">//Extract the stock symbols</span>
+<span class="k">val</span> <span class="n">mentionedSymbols</span> <span
class="k">=</span> <span class="n">tweetStream</span><span
class="o">.</span><span class="n">flatMap</span><span class="o">(</span><span
class="n">tweet</span> <span class="k">=></span> <span
class="n">tweet</span><span class="o">.</span><span class="n">split</span><span
class="o">(</span><span class="s">" "</span><span class="o">))</span>
+ <span class="o">.</span><span class="n">map</span><span
class="o">(</span><span class="k">_</span><span class="o">.</span><span
class="n">toUpperCase</span><span class="o">())</span>
+ <span class="o">.</span><span class="n">filter</span><span
class="o">(</span><span class="n">symbols</span><span class="o">.</span><span
class="n">contains</span><span class="o">(</span><span class="k">_</span><span
class="o">))</span>
+
+<span class="c1">//Count the extracted symbols</span>
+<span class="k">val</span> <span class="n">tweetsPerStock</span> <span
class="k">=</span> <span class="n">mentionedSymbols</span><span
class="o">.</span><span class="n">map</span><span class="o">(</span><span
class="nc">Count</span><span class="o">(</span><span class="k">_</span><span
class="o">,</span> <span class="mi">1</span><span class="o">))</span>
+ <span class="o">.</span><span class="n">groupBy</span><span
class="o">(</span><span class="s">"symbol"</span><span
class="o">)</span>
+ <span class="o">.</span><span class="n">window</span><span
class="o">(</span><span class="nc">Time</span><span class="o">.</span><span
class="n">of</span><span class="o">(</span><span class="mi">30</span><span
class="o">,</span> <span class="nc">SECONDS</span><span class="o">))</span>
+ <span class="o">.</span><span class="n">sum</span><span
class="o">(</span><span class="s">"count"</span><span
class="o">)</span>
+
+<span class="k">def</span> <span class="n">generateTweets</span><span
class="o">(</span><span class="n">out</span><span class="k">:</span> <span
class="kt">Collector</span><span class="o">[</span><span
class="kt">String</span><span class="o">])</span> <span class="k">=</span>
<span class="o">{</span>
+ <span class="k">while</span> <span class="o">(</span><span
class="kc">true</span><span class="o">)</span> <span class="o">{</span>
+ <span class="k">val</span> <span class="n">s</span> <span
class="k">=</span> <span class="k">for</span> <span class="o">(</span><span
class="n">i</span> <span class="k"><-</span> <span class="mi">1</span> <span
class="n">to</span> <span class="mi">3</span><span class="o">)</span> <span
class="k">yield</span> <span class="o">(</span><span
class="n">symbols</span><span class="o">(</span><span
class="nc">Random</span><span class="o">.</span><span
class="n">nextInt</span><span class="o">(</span><span
class="n">symbols</span><span class="o">.</span><span
class="n">size</span><span class="o">)))</span>
+ <span class="n">out</span><span class="o">.</span><span
class="n">collect</span><span class="o">(</span><span class="n">s</span><span
class="o">.</span><span class="n">mkString</span><span class="o">(</span><span
class="s">" "</span><span class="o">))</span>
+ <span class="nc">Thread</span><span class="o">.</span><span
class="n">sleep</span><span class="o">(</span><span
class="nc">Random</span><span class="o">.</span><span
class="n">nextInt</span><span class="o">(</span><span
class="mi">500</span><span class="o">))</span>
+ <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+ </div>
+
+ <div data-lang="java7">
+
+ <div class="highlight"><pre><code class="language-java"
data-lang="java"><span class="c1">//Read a stream of tweets</span>
+<span class="n">DataStream</span><span class="o"><</span><span
class="n">String</span><span class="o">></span> <span
class="n">tweetStream</span> <span class="o">=</span> <span
class="n">env</span><span class="o">.</span><span
class="na">addSource</span><span class="o">(</span><span class="k">new</span>
<span class="nf">TweetSource</span><span class="o">());</span>
+
+<span class="c1">//Extract the stock symbols</span>
+<span class="n">DataStream</span><span class="o"><</span><span
class="n">String</span><span class="o">></span> <span
class="n">mentionedSymbols</span> <span class="o">=</span> <span
class="n">tweetStream</span><span class="o">.</span><span
class="na">flatMap</span><span class="o">(</span>
+ <span class="k">new</span> <span class="n">FlatMapFunction</span><span
class="o"><</span><span class="n">String</span><span class="o">,</span>
<span class="n">String</span><span class="o">>()</span> <span
class="o">{</span>
+ <span class="nd">@Override</span>
+ <span class="kd">public</span> <span class="kt">void</span> <span
class="nf">flatMap</span><span class="o">(</span><span class="n">String</span>
<span class="n">value</span><span class="o">,</span> <span
class="n">Collector</span><span class="o"><</span><span
class="n">String</span><span class="o">></span> <span
class="n">out</span><span class="o">)</span> <span class="kd">throws</span>
<span class="n">Exception</span> <span class="o">{</span>
+ <span class="n">String</span><span class="o">[]</span> <span
class="n">words</span> <span class="o">=</span> <span
class="n">value</span><span class="o">.</span><span
class="na">split</span><span class="o">(</span><span class="s">"
"</span><span class="o">);</span>
+ <span class="k">for</span> <span class="o">(</span><span
class="n">String</span> <span class="n">word</span> <span class="o">:</span>
<span class="n">words</span><span class="o">)</span> <span class="o">{</span>
+ <span class="n">out</span><span class="o">.</span><span
class="na">collect</span><span class="o">(</span><span
class="n">word</span><span class="o">.</span><span
class="na">toUpperCase</span><span class="o">());</span>
+ <span class="o">}</span>
+ <span class="o">}</span>
+<span class="o">}).</span><span class="na">filter</span><span
class="o">(</span><span class="k">new</span> <span
class="n">FilterFunction</span><span class="o"><</span><span
class="n">String</span><span class="o">>()</span> <span class="o">{</span>
+ <span class="nd">@Override</span>
+ <span class="kd">public</span> <span class="kt">boolean</span> <span
class="nf">filter</span><span class="o">(</span><span class="n">String</span>
<span class="n">value</span><span class="o">)</span> <span
class="kd">throws</span> <span class="n">Exception</span> <span
class="o">{</span>
+ <span class="k">return</span> <span class="n">SYMBOLS</span><span
class="o">.</span><span class="na">contains</span><span class="o">(</span><span
class="n">value</span><span class="o">);</span>
+ <span class="o">}</span>
+<span class="o">});</span>
+
+<span class="c1">//Count the extracted symbols</span>
+<span class="n">DataStream</span><span class="o"><</span><span
class="n">Count</span><span class="o">></span> <span
class="n">tweetsPerStock</span> <span class="o">=</span> <span
class="n">mentionedSymbols</span><span class="o">.</span><span
class="na">map</span><span class="o">(</span><span class="k">new</span> <span
class="n">MapFunction</span><span class="o"><</span><span
class="n">String</span><span class="o">,</span> <span
class="n">Count</span><span class="o">>()</span> <span class="o">{</span>
+ <span class="nd">@Override</span>
+ <span class="kd">public</span> <span class="n">Count</span> <span
class="nf">map</span><span class="o">(</span><span class="n">String</span>
<span class="n">value</span><span class="o">)</span> <span
class="kd">throws</span> <span class="n">Exception</span> <span
class="o">{</span>
+ <span class="k">return</span> <span class="k">new</span> <span
class="nf">Count</span><span class="o">(</span><span
class="n">value</span><span class="o">,</span> <span class="mi">1</span><span
class="o">);</span>
+ <span class="o">}</span>
+<span class="o">}).</span><span class="na">groupBy</span><span
class="o">(</span><span class="s">"symbol"</span><span
class="o">).</span><span class="na">window</span><span class="o">(</span><span
class="n">Time</span><span class="o">.</span><span class="na">of</span><span
class="o">(</span><span class="mi">30</span><span class="o">,</span> <span
class="n">TimeUnit</span><span class="o">.</span><span
class="na">SECONDS</span><span class="o">)).</span><span
class="na">sum</span><span class="o">(</span><span
class="s">"count"</span><span class="o">).</span><span
class="na">flatten</span><span class="o">();</span>
+
+<span class="kd">public</span> <span class="kd">static</span> <span
class="kd">final</span> <span class="kd">class</span> <span
class="nc">TweetSource</span> <span class="kd">implements</span> <span
class="n">SourceFunction</span><span class="o"><</span><span
class="n">String</span><span class="o">></span> <span class="o">{</span>
+ <span class="n">Random</span> <span class="n">random</span><span
class="o">;</span>
+ <span class="n">StringBuilder</span> <span
class="n">stringBuilder</span><span class="o">;</span>
+
+ <span class="nd">@Override</span>
+ <span class="kd">public</span> <span class="kt">void</span> <span
class="nf">invoke</span><span class="o">(</span><span
class="n">Collector</span><span class="o"><</span><span
class="n">String</span><span class="o">></span> <span
class="n">collector</span><span class="o">)</span> <span
class="kd">throws</span> <span class="n">Exception</span> <span
class="o">{</span>
+ <span class="n">random</span> <span class="o">=</span> <span
class="k">new</span> <span class="nf">Random</span><span class="o">();</span>
+ <span class="n">stringBuilder</span> <span class="o">=</span> <span
class="k">new</span> <span class="nf">StringBuilder</span><span
class="o">();</span>
+
+ <span class="k">while</span> <span class="o">(</span><span
class="kc">true</span><span class="o">)</span> <span class="o">{</span>
+ <span class="n">stringBuilder</span><span class="o">.</span><span
class="na">setLength</span><span class="o">(</span><span
class="mi">0</span><span class="o">);</span>
+ <span class="k">for</span> <span class="o">(</span><span
class="kt">int</span> <span class="n">i</span> <span class="o">=</span> <span
class="mi">0</span><span class="o">;</span> <span class="n">i</span> <span
class="o"><</span> <span class="mi">3</span><span class="o">;</span> <span
class="n">i</span><span class="o">++)</span> <span class="o">{</span>
+ <span class="n">stringBuilder</span><span
class="o">.</span><span class="na">append</span><span class="o">(</span><span
class="s">" "</span><span class="o">);</span>
+ <span class="n">stringBuilder</span><span
class="o">.</span><span class="na">append</span><span class="o">(</span><span
class="n">SYMBOLS</span><span class="o">.</span><span
class="na">get</span><span class="o">(</span><span class="n">random</span><span
class="o">.</span><span class="na">nextInt</span><span class="o">(</span><span
class="n">SYMBOLS</span><span class="o">.</span><span
class="na">size</span><span class="o">())));</span>
+ <span class="o">}</span>
+ <span class="n">collector</span><span class="o">.</span><span
class="na">collect</span><span class="o">(</span><span
class="n">stringBuilder</span><span class="o">.</span><span
class="na">toString</span><span class="o">());</span>
+ <span class="n">Thread</span><span class="o">.</span><span
class="na">sleep</span><span class="o">(</span><span class="mi">500</span><span
class="o">);</span>
+ <span class="o">}</span>
+
+ <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+ </div>
+
+</div>
+
+<p><a href="#top">Back to top</a></p>
+
+<h2 id="streaming-joins">Streaming joins</h2>
+
+<p>Finally, we join real-time tweets and stock prices and compute a
+rolling correlation between the number of price warnings and the
+number of mentions of a given stock in the Twitter stream. As both of
+these data streams are potentially infinite, we apply the join on a
+30-second window.</p>
+
+<p><img alt="Streaming joins" src="/img/blog/blog_stream_join.png" width="60%"
class="img-responsive center-block" /></p>
+
+<div class="codetabs">
+
+ <div data-lang="scala">
+
+ <div class="highlight"><pre><code class="language-scala"
data-lang="scala"><span class="c1">//Join warnings and parsed tweets</span>
+<span class="k">val</span> <span class="n">tweetsAndWarning</span> <span
class="k">=</span> <span class="n">warningsPerStock</span><span
class="o">.</span><span class="n">join</span><span class="o">(</span><span
class="n">tweetsPerStock</span><span class="o">)</span>
+ <span class="o">.</span><span class="n">onWindow</span><span
class="o">(</span><span class="mi">30</span><span class="o">,</span> <span
class="nc">SECONDS</span><span class="o">)</span>
+ <span class="o">.</span><span class="n">where</span><span
class="o">(</span><span class="s">"symbol"</span><span
class="o">)</span>
+ <span class="o">.</span><span class="n">equalTo</span><span
class="o">(</span><span class="s">"symbol"</span><span
class="o">)</span> <span class="o">{</span> <span class="o">(</span><span
class="n">c1</span><span class="o">,</span> <span class="n">c2</span><span
class="o">)</span> <span class="k">=></span> <span class="o">(</span><span
class="n">c1</span><span class="o">.</span><span class="n">count</span><span
class="o">,</span> <span class="n">c2</span><span class="o">.</span><span
class="n">count</span><span class="o">)</span> <span class="o">}</span>
+
+<span class="k">val</span> <span class="n">rollingCorrelation</span> <span
class="k">=</span> <span class="n">tweetsAndWarning</span><span
class="o">.</span><span class="n">window</span><span class="o">(</span><span
class="nc">Time</span><span class="o">.</span><span class="n">of</span><span
class="o">(</span><span class="mi">30</span><span class="o">,</span> <span
class="nc">SECONDS</span><span class="o">))</span>
+ <span class="o">.</span><span class="n">mapWindow</span><span
class="o">(</span><span class="n">computeCorrelation</span> <span
class="k">_</span><span class="o">)</span>
+
+<span class="n">rollingCorrelation</span> <span class="n">print</span>
+
+<span class="c1">//Compute rolling correlation</span>
+<span class="k">def</span> <span class="n">computeCorrelation</span><span
class="o">(</span><span class="n">input</span><span class="k">:</span> <span
class="kt">Iterable</span><span class="o">[(</span><span class="kt">Int</span>,
<span class="kt">Int</span><span class="o">)],</span> <span
class="n">out</span><span class="k">:</span> <span
class="kt">Collector</span><span class="o">[</span><span
class="kt">Double</span><span class="o">])</span> <span class="k">=</span>
<span class="o">{</span>
+ <span class="k">if</span> <span class="o">(</span><span
class="n">input</span><span class="o">.</span><span
class="n">nonEmpty</span><span class="o">)</span> <span class="o">{</span>
+ <span class="k">val</span> <span class="n">var1</span> <span
class="k">=</span> <span class="n">input</span><span class="o">.</span><span
class="n">map</span><span class="o">(</span><span class="k">_</span><span
class="o">.</span><span class="n">_1</span><span class="o">)</span>
+ <span class="k">val</span> <span class="n">mean1</span> <span
class="k">=</span> <span class="n">average</span><span class="o">(</span><span
class="n">var1</span><span class="o">)</span>
+ <span class="k">val</span> <span class="n">var2</span> <span
class="k">=</span> <span class="n">input</span><span class="o">.</span><span
class="n">map</span><span class="o">(</span><span class="k">_</span><span
class="o">.</span><span class="n">_2</span><span class="o">)</span>
+ <span class="k">val</span> <span class="n">mean2</span> <span
class="k">=</span> <span class="n">average</span><span class="o">(</span><span
class="n">var2</span><span class="o">)</span>
+
+ <span class="k">val</span> <span class="n">cov</span> <span
class="k">=</span> <span class="n">average</span><span class="o">(</span><span
class="n">var1</span><span class="o">.</span><span class="n">zip</span><span
class="o">(</span><span class="n">var2</span><span class="o">).</span><span
class="n">map</span><span class="o">(</span><span class="n">xy</span> <span
class="k">=></span> <span class="o">(</span><span class="n">xy</span><span
class="o">.</span><span class="n">_1</span> <span class="o">-</span> <span
class="n">mean1</span><span class="o">)</span> <span class="o">*</span> <span
class="o">(</span><span class="n">xy</span><span class="o">.</span><span
class="n">_2</span> <span class="o">-</span> <span class="n">mean2</span><span
class="o">)))</span>
+ <span class="k">val</span> <span class="n">d1</span> <span
class="k">=</span> <span class="nc">Math</span><span class="o">.</span><span
class="n">sqrt</span><span class="o">(</span><span
class="n">average</span><span class="o">(</span><span
class="n">var1</span><span class="o">.</span><span class="n">map</span><span
class="o">(</span><span class="n">x</span> <span class="k">=></span> <span
class="nc">Math</span><span class="o">.</span><span class="n">pow</span><span
class="o">((</span><span class="n">x</span> <span class="o">-</span> <span
class="n">mean1</span><span class="o">),</span> <span class="mi">2</span><span
class="o">))))</span>
+ <span class="k">val</span> <span class="n">d2</span> <span
class="k">=</span> <span class="nc">Math</span><span class="o">.</span><span
class="n">sqrt</span><span class="o">(</span><span
class="n">average</span><span class="o">(</span><span
class="n">var2</span><span class="o">.</span><span class="n">map</span><span
class="o">(</span><span class="n">x</span> <span class="k">=></span> <span
class="nc">Math</span><span class="o">.</span><span class="n">pow</span><span
class="o">((</span><span class="n">x</span> <span class="o">-</span> <span
class="n">mean2</span><span class="o">),</span> <span class="mi">2</span><span
class="o">))))</span>
+
+ <span class="n">out</span><span class="o">.</span><span
class="n">collect</span><span class="o">(</span><span class="n">cov</span>
<span class="o">/</span> <span class="o">(</span><span class="n">d1</span>
<span class="o">*</span> <span class="n">d2</span><span class="o">))</span>
+ <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+ </div>
+
+ <div data-lang="java7">
+
+ <div class="highlight"><pre><code class="language-java"
data-lang="java"><span class="c1">//Join warnings and parsed tweets</span>
+<span class="n">DataStream</span><span class="o"><</span><span
class="n">Tuple2</span><span class="o"><</span><span
class="n">Integer</span><span class="o">,</span> <span
class="n">Integer</span><span class="o">>></span> <span
class="n">tweetsAndWarning</span> <span class="o">=</span> <span
class="n">warningsPerStock</span>
+ <span class="o">.</span><span class="na">join</span><span
class="o">(</span><span class="n">tweetsPerStock</span><span class="o">)</span>
+ <span class="o">.</span><span class="na">onWindow</span><span
class="o">(</span><span class="mi">30</span><span class="o">,</span> <span
class="n">TimeUnit</span><span class="o">.</span><span
class="na">SECONDS</span><span class="o">)</span>
+ <span class="o">.</span><span class="na">where</span><span
class="o">(</span><span class="s">"symbol"</span><span
class="o">)</span>
+ <span class="o">.</span><span class="na">equalTo</span><span
class="o">(</span><span class="s">"symbol"</span><span
class="o">)</span>
+ <span class="o">.</span><span class="na">with</span><span
class="o">(</span><span class="k">new</span> <span
class="n">JoinFunction</span><span class="o"><</span><span
class="n">Count</span><span class="o">,</span> <span
class="n">Count</span><span class="o">,</span> <span
class="n">Tuple2</span><span class="o"><</span><span
class="n">Integer</span><span class="o">,</span> <span
class="n">Integer</span><span class="o">>>()</span> <span
class="o">{</span>
+ <span class="nd">@Override</span>
+ <span class="kd">public</span> <span class="n">Tuple2</span><span
class="o"><</span><span class="n">Integer</span><span class="o">,</span>
<span class="n">Integer</span><span class="o">></span> <span
class="nf">join</span><span class="o">(</span><span class="n">Count</span>
<span class="n">first</span><span class="o">,</span> <span
class="n">Count</span> <span class="n">second</span><span class="o">)</span>
<span class="kd">throws</span> <span class="n">Exception</span> <span
class="o">{</span>
+ <span class="k">return</span> <span class="k">new</span> <span
class="n">Tuple2</span><span class="o"><</span><span
class="n">Integer</span><span class="o">,</span> <span
class="n">Integer</span><span class="o">>(</span><span
class="n">first</span><span class="o">.</span><span
class="na">count</span><span class="o">,</span> <span
class="n">second</span><span class="o">.</span><span
class="na">count</span><span class="o">);</span>
+ <span class="o">}</span>
+ <span class="o">});</span>
+
+<span class="c1">//Compute rolling correlation</span>
+<span class="n">DataStream</span><span class="o"><</span><span
class="n">Double</span><span class="o">></span> <span
class="n">rollingCorrelation</span> <span class="o">=</span> <span
class="n">tweetsAndWarning</span>
+ <span class="o">.</span><span class="na">window</span><span
class="o">(</span><span class="n">Time</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span class="mi">30</span><span
class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span
class="na">SECONDS</span><span class="o">))</span>
+ <span class="o">.</span><span class="na">mapWindow</span><span
class="o">(</span><span class="k">new</span> <span
class="nf">WindowCorrelation</span><span class="o">());</span>
+
+<span class="n">rollingCorrelation</span><span class="o">.</span><span
class="na">print</span><span class="o">();</span>
+
+<span class="kd">public</span> <span class="kd">static</span> <span
class="kd">final</span> <span class="kd">class</span> <span
class="nc">WindowCorrelation</span>
+ <span class="kd">implements</span> <span
class="n">WindowMapFunction</span><span class="o"><</span><span
class="n">Tuple2</span><span class="o"><</span><span
class="n">Integer</span><span class="o">,</span> <span
class="n">Integer</span><span class="o">>,</span> <span
class="n">Double</span><span class="o">></span> <span class="o">{</span>
+
+ <span class="kd">private</span> <span class="n">Integer</span> <span
class="n">leftSum</span><span class="o">;</span>
+ <span class="kd">private</span> <span class="n">Integer</span> <span
class="n">rightSum</span><span class="o">;</span>
+ <span class="kd">private</span> <span class="n">Integer</span> <span
class="n">count</span><span class="o">;</span>
+
+ <span class="kd">private</span> <span class="n">Double</span> <span
class="n">leftMean</span><span class="o">;</span>
+ <span class="kd">private</span> <span class="n">Double</span> <span
class="n">rightMean</span><span class="o">;</span>
+
+ <span class="kd">private</span> <span class="n">Double</span> <span
class="n">cov</span><span class="o">;</span>
+ <span class="kd">private</span> <span class="n">Double</span> <span
class="n">leftSd</span><span class="o">;</span>
+ <span class="kd">private</span> <span class="n">Double</span> <span
class="n">rightSd</span><span class="o">;</span>
+
+ <span class="nd">@Override</span>
+ <span class="kd">public</span> <span class="kt">void</span> <span
class="nf">mapWindow</span><span class="o">(</span><span
class="n">Iterable</span><span class="o"><</span><span
class="n">Tuple2</span><span class="o"><</span><span
class="n">Integer</span><span class="o">,</span> <span
class="n">Integer</span><span class="o">>></span> <span
class="n">values</span><span class="o">,</span> <span
class="n">Collector</span><span class="o"><</span><span
class="n">Double</span><span class="o">></span> <span
class="n">out</span><span class="o">)</span>
+ <span class="kd">throws</span> <span class="n">Exception</span> <span
class="o">{</span>
+
+ <span class="n">leftSum</span> <span class="o">=</span> <span
class="mi">0</span><span class="o">;</span>
+ <span class="n">rightSum</span> <span class="o">=</span> <span
class="mi">0</span><span class="o">;</span>
+ <span class="n">count</span> <span class="o">=</span> <span
class="mi">0</span><span class="o">;</span>
+
+ <span class="n">cov</span> <span class="o">=</span> <span
class="mi">0</span><span class="o">.;</span>
+ <span class="n">leftSd</span> <span class="o">=</span> <span
class="mi">0</span><span class="o">.;</span>
+ <span class="n">rightSd</span> <span class="o">=</span> <span
class="mi">0</span><span class="o">.;</span>
+
+ <span class="c1">//compute mean for both sides, save count</span>
+ <span class="k">for</span> <span class="o">(</span><span
class="n">Tuple2</span><span class="o"><</span><span
class="n">Integer</span><span class="o">,</span> <span
class="n">Integer</span><span class="o">></span> <span class="n">pair</span>
<span class="o">:</span> <span class="n">values</span><span class="o">)</span>
<span class="o">{</span>
+ <span class="n">leftSum</span> <span class="o">+=</span> <span
class="n">pair</span><span class="o">.</span><span class="na">f0</span><span
class="o">;</span>
+ <span class="n">rightSum</span> <span class="o">+=</span> <span
class="n">pair</span><span class="o">.</span><span class="na">f1</span><span
class="o">;</span>
+ <span class="n">count</span><span class="o">++;</span>
+ <span class="o">}</span>
+
+ <span class="n">leftMean</span> <span class="o">=</span> <span
class="n">leftSum</span><span class="o">.</span><span
class="na">doubleValue</span><span class="o">()</span> <span class="o">/</span>
<span class="n">count</span><span class="o">;</span>
+ <span class="n">rightMean</span> <span class="o">=</span> <span
class="n">rightSum</span><span class="o">.</span><span
class="na">doubleValue</span><span class="o">()</span> <span class="o">/</span>
<span class="n">count</span><span class="o">;</span>
+
+ <span class="c1">//compute covariance & std. deviations</span>
+ <span class="k">for</span> <span class="o">(</span><span
class="n">Tuple2</span><span class="o"><</span><span
class="n">Integer</span><span class="o">,</span> <span
class="n">Integer</span><span class="o">></span> <span class="n">pair</span>
<span class="o">:</span> <span class="n">values</span><span class="o">)</span>
<span class="o">{</span>
+ <span class="n">cov</span> <span class="o">+=</span> <span
class="o">(</span><span class="n">pair</span><span class="o">.</span><span
class="na">f0</span> <span class="o">-</span> <span
class="n">leftMean</span><span class="o">)</span> <span class="o">*</span>
<span class="o">(</span><span class="n">pair</span><span
class="o">.</span><span class="na">f1</span> <span class="o">-</span> <span
class="n">rightMean</span><span class="o">)</span> <span class="o">/</span>
<span class="n">count</span><span class="o">;</span>
+ <span class="o">}</span>
+
+ <span class="k">for</span> <span class="o">(</span><span
class="n">Tuple2</span><span class="o"><</span><span
class="n">Integer</span><span class="o">,</span> <span
class="n">Integer</span><span class="o">></span> <span class="n">pair</span>
<span class="o">:</span> <span class="n">values</span><span class="o">)</span>
<span class="o">{</span>
+ <span class="n">leftSd</span> <span class="o">+=</span> <span
class="n">Math</span><span class="o">.</span><span class="na">pow</span><span
class="o">(</span><span class="n">pair</span><span class="o">.</span><span
class="na">f0</span> <span class="o">-</span> <span
class="n">leftMean</span><span class="o">,</span> <span
class="mi">2</span><span class="o">)</span> <span class="o">/</span> <span
class="n">count</span><span class="o">;</span>
+ <span class="n">rightSd</span> <span class="o">+=</span> <span
class="n">Math</span><span class="o">.</span><span class="na">pow</span><span
class="o">(</span><span class="n">pair</span><span class="o">.</span><span
class="na">f1</span> <span class="o">-</span> <span
class="n">rightMean</span><span class="o">,</span> <span
class="mi">2</span><span class="o">)</span> <span class="o">/</span> <span
class="n">count</span><span class="o">;</span>
+ <span class="o">}</span>
+ <span class="n">leftSd</span> <span class="o">=</span> <span
class="n">Math</span><span class="o">.</span><span class="na">sqrt</span><span
class="o">(</span><span class="n">leftSd</span><span class="o">);</span>
+ <span class="n">rightSd</span> <span class="o">=</span> <span
class="n">Math</span><span class="o">.</span><span class="na">sqrt</span><span
class="o">(</span><span class="n">rightSd</span><span class="o">);</span>
+
+ <span class="n">out</span><span class="o">.</span><span
class="na">collect</span><span class="o">(</span><span class="n">cov</span>
<span class="o">/</span> <span class="o">(</span><span class="n">leftSd</span>
<span class="o">*</span> <span class="n">rightSd</span><span
class="o">));</span>
+ <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+ </div>
+
+</div>
+
+<p><a href="#top">Back to top</a></p>
+
+<h2 id="other-things-to-try">Other things to try</h2>
+
+<p>For a full feature overview please check the <a
href="http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html">Streaming
Guide</a>, which describes all the available API features.
+You are very welcome to try out our features for different use-cases we are
looking forward to your experiences. Feel free to <a
href="http://flink.apache.org/community.html#mailing-lists">contact us</a>.</p>
+
+<h2 id="upcoming-for-streaming">Upcoming for streaming</h2>
+
+<p>There are some aspects of Flink Streaming that are subjects to
+change by the next release making this application look even nicer.</p>
+
+<p>Stay tuned for later blog posts on how Flink Streaming works
+internally, fault tolerance, and performance measurements!</p>
+
+<p><a href="#top">Back to top</a></p>
+
+ </article>
+ </div>
+
+ <div class="row">
+ <div id="disqus_thread"></div>
+ <script type="text/javascript">
+ /* * * CONFIGURATION VARIABLES: EDIT BEFORE PASTING INTO YOUR WEBPAGE
* * */
+ var disqus_shortname = 'stratosphere-eu'; // required: replace example
with your forum shortname
+
+ /* * * DON'T EDIT BELOW THIS LINE * * */
+ (function() {
+ var dsq = document.createElement('script'); dsq.type =
'text/javascript'; dsq.async = true;
+ dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
+ (document.getElementsByTagName('head')[0] ||
document.getElementsByTagName('body')[0]).appendChild(dsq);
+ })();
+ </script>
+ </div>
+ </div>
+</div>
+ </div>
+ </div>
+
+ <hr />
+
+ <div class="row">
+ <div class="footer text-center col-sm-12">
+ <p>Copyright © 2014-2016 <a href="http://apache.org">The Apache
Software Foundation</a>. All Rights Reserved.</p>
+ <p>Apache Flink, Apache, and the Apache feather logo are either
registered trademarks or trademarks of The Apache Software Foundation.</p>
+ <p><a href="/privacy-policy.html">Privacy Policy</a> · <a
href="/blog/feed.xml">RSS feed</a></p>
+ </div>
+ </div>
+ </div><!-- /.container -->
+
+ <!-- Include all compiled plugins (below), or include individual files as
needed -->
+ <script
src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script>
+ <script src="/js/codetabs.js"></script>
+ <script src="/js/stickysidebar.js"></script>
+
+
+ <!-- Google Analytics -->
+ <script>
+
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
+ (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new
Date();a=s.createElement(o),
+
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
+
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
+
+ ga('create', 'UA-52545728-1', 'auto');
+ ga('send', 'pageview');
+ </script>
+ </body>
+</html>