http://git-wip-us.apache.org/repos/asf/storm-site/blob/a7a19afb/content/releases/0.9.6/Storm-multi-language-protocol-(versions-0.7.0-and-below).html ---------------------------------------------------------------------- diff --git a/content/releases/0.9.6/Storm-multi-language-protocol-(versions-0.7.0-and-below).html b/content/releases/0.9.6/Storm-multi-language-protocol-(versions-0.7.0-and-below).html index be861b3..d869015 100644 --- a/content/releases/0.9.6/Storm-multi-language-protocol-(versions-0.7.0-and-below).html +++ b/content/releases/0.9.6/Storm-multi-language-protocol-(versions-0.7.0-and-below).html @@ -82,18 +82,16 @@ - <li><a href="/releases/1.1.0/index.html">1.1.0</a></li> - + <li><a href="/releases/1.1.0/index.html">1.1.0</a></li> + <li><a href="/releases/1.0.4/index.html">1.0.4</a></li> - - <li><a href="/releases/1.0.3/index.html">1.0.3</a></li> @@ -104,6 +102,8 @@ + + <li><a href="/releases/0.10.2/index.html">0.10.2</a></li> @@ -144,7 +144,7 @@ <li><a href="/contribute/BYLAWS.html">ByLaws</a></li> </ul> </li> - <li><a href="/2017/09/15/storm105-released.html" id="news">News</a></li> + <li><a href="/2018/02/14/storm106-released.html" id="news">News</a></li> </ul> </nav> </div> @@ -210,56 +210,56 @@ file lets the supervisor know the PID so it can shutdown the process later on.</ <li>The rest happens in a while(true) loop</li> <li>STDIN: A tuple! This is a JSON encoded structure like this:</li> </ul> -<div class="highlight"><pre><code class="language-" data-lang=""><span class="p">{</span><span class="w"> - </span><span class="err">//</span><span class="w"> </span><span class="err">The</span><span class="w"> </span><span class="err">tuple's</span><span class="w"> </span><span class="err">id</span><span class="w"> - </span><span class="nt">"id"</span><span class="p">:</span><span class="w"> </span><span class="mi">-6955786537413359385</span><span class="p">,</span><span class="w"> - </span><span class="err">//</span><span class="w"> </span><span class="err">The</span><span class="w"> </span><span class="err">id</span><span class="w"> </span><span class="err">of</span><span class="w"> </span><span class="err">the</span><span class="w"> </span><span class="err">component</span><span class="w"> </span><span class="err">that</span><span class="w"> </span><span class="err">created</span><span class="w"> </span><span class="err">this</span><span class="w"> </span><span class="err">tuple</span><span class="w"> - </span><span class="nt">"comp"</span><span class="p">:</span><span class="w"> </span><span class="mi">1</span><span class="p">,</span><span class="w"> - </span><span class="err">//</span><span class="w"> </span><span class="err">The</span><span class="w"> </span><span class="err">id</span><span class="w"> </span><span class="err">of</span><span class="w"> </span><span class="err">the</span><span class="w"> </span><span class="err">stream</span><span class="w"> </span><span class="err">this</span><span class="w"> </span><span class="err">tuple</span><span class="w"> </span><span class="err">was</span><span class="w"> </span><span class="err">emitted</span><span class="w"> </span><span class="err">to</span><span class="w"> - </span><span class="nt">"stream"</span><span class="p">:</span><span class="w"> </span><span class="mi">1</span><span class="p">,</span><span class="w"> - </span><span class="err">//</span><span class="w"> </span><span class="err">The</span><span class="w"> </span><span class="err">id</span><span class="w"> </span><span class="err">of</span><span class="w"> </span><span class="err">the</span><span class="w"> </span><span class="err">task</span><span class="w"> </span><span class="err">that</span><span class="w"> </span><span class="err">created</span><span class="w"> </span><span class="err">this</span><span class="w"> </span><span class="err">tuple</span><span class="w"> - </span><span class="nt">"task"</span><span class="p">:</span><span class="w"> </span><span class="mi">9</span><span class="p">,</span><span class="w"> - </span><span class="err">//</span><span class="w"> </span><span class="err">All</span><span class="w"> </span><span class="err">the</span><span class="w"> </span><span class="err">values</span><span class="w"> </span><span class="err">in</span><span class="w"> </span><span class="err">this</span><span class="w"> </span><span class="err">tuple</span><span class="w"> - </span><span class="nt">"tuple"</span><span class="p">:</span><span class="w"> </span><span class="p">[</span><span class="s2">"snow white and the seven dwarfs"</span><span class="p">,</span><span class="w"> </span><span class="s2">"field2"</span><span class="p">,</span><span class="w"> </span><span class="mi">3</span><span class="p">]</span><span class="w"> -</span><span class="p">}</span><span class="w"> -</span></code></pre></div> +<div class="highlight"><pre><code class="language-" data-lang="">{ + // The tuple's id + "id": -6955786537413359385, + // The id of the component that created this tuple + "comp": 1, + // The id of the stream this tuple was emitted to + "stream": 1, + // The id of the task that created this tuple + "task": 9, + // All the values in this tuple + "tuple": ["snow white and the seven dwarfs", "field2", 3] +} +</code></pre></div> <ul> <li>STDOUT: The results of your bolt, JSON encoded. This can be a sequence of acks, fails, emits, and/or logs. Emits look like:</li> </ul> -<div class="highlight"><pre><code class="language-" data-lang=""><span class="p">{</span><span class="w"> - </span><span class="nt">"command"</span><span class="p">:</span><span class="w"> </span><span class="s2">"emit"</span><span class="p">,</span><span class="w"> - </span><span class="err">//</span><span class="w"> </span><span class="err">The</span><span class="w"> </span><span class="err">ids</span><span class="w"> </span><span class="err">of</span><span class="w"> </span><span class="err">the</span><span class="w"> </span><span class="err">tuples</span><span class="w"> </span><span class="err">this</span><span class="w"> </span><span class="err">output</span><span class="w"> </span><span class="err">tuples</span><span class="w"> </span><span class="err">should</span><span class="w"> </span><span class="err">be</span><span class="w"> </span><span class="err">anchored</span><span class="w"> </span><span class="err">to</span><span class="w"> - </span><span class="nt">"anchors"</span><span class="p">:</span><span class="w"> </span><span class="p">[</span><span class="mi">1231231</span><span class="p">,</span><span class="w"> </span><span class="mi">-234234234</span><span class="p">],</span><span class="w"> - </span><span class="err">//</span><span class="w"> </span><span class="err">The</span><span class="w"> </span><span class="err">id</span><span class="w"> </span><span class="err">of</span><span class="w"> </span><span class="err">the</span><span class="w"> </span><span class="err">stream</span><span class="w"> </span><span class="err">this</span><span class="w"> </span><span class="err">tuple</span><span class="w"> </span><span class="err">was</span><span class="w"> </span><span class="err">emitted</span><span class="w"> </span><span class="err">to.</span><span class="w"> </span><span class="err">Leave</span><span class="w"> </span><span class="err">this</span><span class="w"> </span><span class="err">empty</span><span class="w"> </span><span class="err">to</span><span class="w"> </span><span class="err">emit</span><span class="w"> </span><span class="err">to</span><span class="w"> </span><span class="err">default</span><span class="w"> </span><span class="err">stream.</span><sp an class="w"> - </span><span class="nt">"stream"</span><span class="p">:</span><span class="w"> </span><span class="mi">1</span><span class="p">,</span><span class="w"> - </span><span class="err">//</span><span class="w"> </span><span class="err">If</span><span class="w"> </span><span class="err">doing</span><span class="w"> </span><span class="err">an</span><span class="w"> </span><span class="err">emit</span><span class="w"> </span><span class="err">direct,</span><span class="w"> </span><span class="err">indicate</span><span class="w"> </span><span class="err">the</span><span class="w"> </span><span class="err">task</span><span class="w"> </span><span class="err">to</span><span class="w"> </span><span class="err">sent</span><span class="w"> </span><span class="err">the</span><span class="w"> </span><span class="err">tuple</span><span class="w"> </span><span class="err">to</span><span class="w"> - </span><span class="nt">"task"</span><span class="p">:</span><span class="w"> </span><span class="mi">9</span><span class="p">,</span><span class="w"> - </span><span class="err">//</span><span class="w"> </span><span class="err">All</span><span class="w"> </span><span class="err">the</span><span class="w"> </span><span class="err">values</span><span class="w"> </span><span class="err">in</span><span class="w"> </span><span class="err">this</span><span class="w"> </span><span class="err">tuple</span><span class="w"> - </span><span class="nt">"tuple"</span><span class="p">:</span><span class="w"> </span><span class="p">[</span><span class="s2">"field1"</span><span class="p">,</span><span class="w"> </span><span class="mi">2</span><span class="p">,</span><span class="w"> </span><span class="mi">3</span><span class="p">]</span><span class="w"> -</span><span class="p">}</span><span class="w"> -</span></code></pre></div> +<div class="highlight"><pre><code class="language-" data-lang="">{ + "command": "emit", + // The ids of the tuples this output tuples should be anchored to + "anchors": [1231231, -234234234], + // The id of the stream this tuple was emitted to. Leave this empty to emit to default stream. + "stream": 1, + // If doing an emit direct, indicate the task to sent the tuple to + "task": 9, + // All the values in this tuple + "tuple": ["field1", 2, 3] +} +</code></pre></div> <p>An ack looks like:</p> -<div class="highlight"><pre><code class="language-" data-lang=""><span class="p">{</span><span class="w"> - </span><span class="nt">"command"</span><span class="p">:</span><span class="w"> </span><span class="s2">"ack"</span><span class="p">,</span><span class="w"> - </span><span class="err">//</span><span class="w"> </span><span class="err">the</span><span class="w"> </span><span class="err">id</span><span class="w"> </span><span class="err">of</span><span class="w"> </span><span class="err">the</span><span class="w"> </span><span class="err">tuple</span><span class="w"> </span><span class="err">to</span><span class="w"> </span><span class="err">ack</span><span class="w"> - </span><span class="nt">"id"</span><span class="p">:</span><span class="w"> </span><span class="mi">123123</span><span class="w"> -</span><span class="p">}</span><span class="w"> -</span></code></pre></div> +<div class="highlight"><pre><code class="language-" data-lang="">{ + "command": "ack", + // the id of the tuple to ack + "id": 123123 +} +</code></pre></div> <p>A fail looks like:</p> -<div class="highlight"><pre><code class="language-" data-lang=""><span class="p">{</span><span class="w"> - </span><span class="nt">"command"</span><span class="p">:</span><span class="w"> </span><span class="s2">"fail"</span><span class="p">,</span><span class="w"> - </span><span class="err">//</span><span class="w"> </span><span class="err">the</span><span class="w"> </span><span class="err">id</span><span class="w"> </span><span class="err">of</span><span class="w"> </span><span class="err">the</span><span class="w"> </span><span class="err">tuple</span><span class="w"> </span><span class="err">to</span><span class="w"> </span><span class="err">fail</span><span class="w"> - </span><span class="nt">"id"</span><span class="p">:</span><span class="w"> </span><span class="mi">123123</span><span class="w"> -</span><span class="p">}</span><span class="w"> -</span></code></pre></div> +<div class="highlight"><pre><code class="language-" data-lang="">{ + "command": "fail", + // the id of the tuple to fail + "id": 123123 +} +</code></pre></div> <p>A "log" will log a message in the worker log. It looks like:</p> -<div class="highlight"><pre><code class="language-" data-lang=""><span class="p">{</span><span class="w"> - </span><span class="nt">"command"</span><span class="p">:</span><span class="w"> </span><span class="s2">"log"</span><span class="p">,</span><span class="w"> - </span><span class="err">//</span><span class="w"> </span><span class="err">the</span><span class="w"> </span><span class="err">message</span><span class="w"> </span><span class="err">to</span><span class="w"> </span><span class="err">log</span><span class="w"> - </span><span class="nt">"msg"</span><span class="p">:</span><span class="w"> </span><span class="s2">"hello world!"</span><span class="w"> +<div class="highlight"><pre><code class="language-" data-lang="">{ + "command": "log", + // the message to log + "msg": "hello world!" -</span><span class="p">}</span><span class="w"> -</span></code></pre></div> +} +</code></pre></div> <ul> <li>STDOUT: emit "sync" as a single line by itself when the bolt has finished emitting/acking/failing and is ready for the next input</li> </ul>
http://git-wip-us.apache.org/repos/asf/storm-site/blob/a7a19afb/content/releases/0.9.6/Structure-of-the-codebase.html ---------------------------------------------------------------------- diff --git a/content/releases/0.9.6/Structure-of-the-codebase.html b/content/releases/0.9.6/Structure-of-the-codebase.html index 205a54c..c843c08 100644 --- a/content/releases/0.9.6/Structure-of-the-codebase.html +++ b/content/releases/0.9.6/Structure-of-the-codebase.html @@ -82,18 +82,16 @@ - <li><a href="/releases/1.1.0/index.html">1.1.0</a></li> - + <li><a href="/releases/1.1.0/index.html">1.1.0</a></li> + <li><a href="/releases/1.0.4/index.html">1.0.4</a></li> - - <li><a href="/releases/1.0.3/index.html">1.0.3</a></li> @@ -104,6 +102,8 @@ + + <li><a href="/releases/0.10.2/index.html">0.10.2</a></li> @@ -144,7 +144,7 @@ <li><a href="/contribute/BYLAWS.html">ByLaws</a></li> </ul> </li> - <li><a href="/2017/09/15/storm105-released.html" id="news">News</a></li> + <li><a href="/2018/02/14/storm106-released.html" id="news">News</a></li> </ul> </nav> </div> http://git-wip-us.apache.org/repos/asf/storm-site/blob/a7a19afb/content/releases/0.9.6/Support-for-non-java-languages.html ---------------------------------------------------------------------- diff --git a/content/releases/0.9.6/Support-for-non-java-languages.html b/content/releases/0.9.6/Support-for-non-java-languages.html index 0c9130f..17dd812 100644 --- a/content/releases/0.9.6/Support-for-non-java-languages.html +++ b/content/releases/0.9.6/Support-for-non-java-languages.html @@ -82,18 +82,16 @@ - <li><a href="/releases/1.1.0/index.html">1.1.0</a></li> - + <li><a href="/releases/1.1.0/index.html">1.1.0</a></li> + <li><a href="/releases/1.0.4/index.html">1.0.4</a></li> - - <li><a href="/releases/1.0.3/index.html">1.0.3</a></li> @@ -104,6 +102,8 @@ + + <li><a href="/releases/0.10.2/index.html">0.10.2</a></li> @@ -144,7 +144,7 @@ <li><a href="/contribute/BYLAWS.html">ByLaws</a></li> </ul> </li> - <li><a href="/2017/09/15/storm105-released.html" id="news">News</a></li> + <li><a href="/2018/02/14/storm106-released.html" id="news">News</a></li> </ul> </nav> </div> http://git-wip-us.apache.org/repos/asf/storm-site/blob/a7a19afb/content/releases/0.9.6/Transactional-topologies.html ---------------------------------------------------------------------- diff --git a/content/releases/0.9.6/Transactional-topologies.html b/content/releases/0.9.6/Transactional-topologies.html index e22a982..4f2a462 100644 --- a/content/releases/0.9.6/Transactional-topologies.html +++ b/content/releases/0.9.6/Transactional-topologies.html @@ -82,18 +82,16 @@ - <li><a href="/releases/1.1.0/index.html">1.1.0</a></li> - + <li><a href="/releases/1.1.0/index.html">1.1.0</a></li> + <li><a href="/releases/1.0.4/index.html">1.0.4</a></li> - - <li><a href="/releases/1.0.3/index.html">1.0.3</a></li> @@ -104,6 +102,8 @@ + + <li><a href="/releases/0.10.2/index.html">0.10.2</a></li> @@ -144,7 +144,7 @@ <li><a href="/contribute/BYLAWS.html">ByLaws</a></li> </ul> </li> - <li><a href="/2017/09/15/storm105-released.html" id="news">News</a></li> + <li><a href="/2018/02/14/storm106-released.html" id="news">News</a></li> </ul> </nav> </div> @@ -211,7 +211,7 @@ <p>After bolt 1 finishes its portion of the processing, it will be idle until the rest of the bolts finish and the next batch can be emitted from the spout.</p> -<h3 id="design-3-storm-39-s-design">Design 3 (Storm's design)</h3> +<h3 id="design-3-storms-design">Design 3 (Storm's design)</h3> <p>A key realization is that not all the work for processing batches of tuples needs to be strongly ordered. For example, when computing a global count, there's two parts to the computation:</p> @@ -268,23 +268,23 @@ <span class="kt">int</span> <span class="n">_count</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> <span class="nd">@Override</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="n">prepare</span><span class="o">(</span><span class="n">Map</span> <span class="n">conf</span><span class="o">,</span> <span class="n">TopologyContext</span> <span class="n">context</span><span class="o">,</span> <span class="n">BatchOutputCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="n">Object</span> <span class="n">id</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">prepare</span><span class="o">(</span><span class="n">Map</span> <span class="n">conf</span><span class="o">,</span> <span class="n">TopologyContext</span> <span class="n">context</span><span class="o">,</span> <span class="n">BatchOutputCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="n">Object</span> <span class="n">id</span><span class="o">)</span> <span class="o">{</span> <span class="n">_collector</span> <span class="o">=</span> <span class="n">collector</span><span class="o">;</span> <span class="n">_id</span> <span class="o">=</span> <span class="n">id</span><span class="o">;</span> <span class="o">}</span> <span class="nd">@Override</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="n">execute</span><span class="o">(</span><span class="n">Tuple</span> <span class="n">tuple</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">execute</span><span class="o">(</span><span class="n">Tuple</span> <span class="n">tuple</span><span class="o">)</span> <span class="o">{</span> <span class="n">_count</span><span class="o">++;</span> <span class="o">}</span> <span class="nd">@Override</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="n">finishBatch</span><span class="o">()</span> <span class="o">{</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">finishBatch</span><span class="o">()</span> <span class="o">{</span> <span class="n">_collector</span><span class="o">.</span><span class="na">emit</span><span class="o">(</span><span class="k">new</span> <span class="n">Values</span><span class="o">(</span><span class="n">_id</span><span class="o">,</span> <span class="n">_count</span><span class="o">));</span> <span class="o">}</span> <span class="nd">@Override</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="n">declareOutputFields</span><span class="o">(</span><span class="n">OutputFieldsDeclarer</span> <span class="n">declarer</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">declareOutputFields</span><span class="o">(</span><span class="n">OutputFieldsDeclarer</span> <span class="n">declarer</span><span class="o">)</span> <span class="o">{</span> <span class="n">declarer</span><span class="o">.</span><span class="na">declare</span><span class="o">(</span><span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"id"</span><span class="o">,</span> <span class="s">"count"</span><span class="o">));</span> <span class="o">}</span> <span class="o">}</span> @@ -313,18 +313,18 @@ <span class="kt">int</span> <span class="n">_sum</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> <span class="nd">@Override</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="n">prepare</span><span class="o">(</span><span class="n">Map</span> <span class="n">conf</span><span class="o">,</span> <span class="n">TopologyContext</span> <span class="n">context</span><span class="o">,</span> <span class="n">BatchOutputCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="n">TransactionAttempt</span> <span class="n">attempt</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">prepare</span><span class="o">(</span><span class="n">Map</span> <span class="n">conf</span><span class="o">,</span> <span class="n">TopologyContext</span> <span class="n">context</span><span class="o">,</span> <span class="n">BatchOutputCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="n">TransactionAttempt</span> <span class="n">attempt</span><span class="o">)</span> <span class="o">{</span> <span class="n">_collector</span> <span class="o">=</span> <span class="n">collector</span><span class="o">;</span> <span class="n">_attempt</span> <span class="o">=</span> <span class="n">attempt</span><span class="o">;</span> <span class="o">}</span> <span class="nd">@Override</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="n">execute</span><span class="o">(</span><span class="n">Tuple</span> <span class="n">tuple</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">execute</span><span class="o">(</span><span class="n">Tuple</span> <span class="n">tuple</span><span class="o">)</span> <span class="o">{</span> <span class="n">_sum</span><span class="o">+=</span><span class="n">tuple</span><span class="o">.</span><span class="na">getInteger</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span> <span class="o">}</span> <span class="nd">@Override</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="n">finishBatch</span><span class="o">()</span> <span class="o">{</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">finishBatch</span><span class="o">()</span> <span class="o">{</span> <span class="n">Value</span> <span class="n">val</span> <span class="o">=</span> <span class="n">DATABASE</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="n">GLOBAL_COUNT_KEY</span><span class="o">);</span> <span class="n">Value</span> <span class="n">newval</span><span class="o">;</span> <span class="k">if</span><span class="o">(</span><span class="n">val</span> <span class="o">==</span> <span class="kc">null</span> <span class="o">||</span> <span class="o">!</span><span class="n">val</span><span class="o">.</span><span class="na">txid</span><span class="o">.</span><span class="na">equals</span><span class="o">(</span><span class="n">_attempt</span><span class="o">.</span><span class="na">getTransactionId</span><span class="o">()))</span> <span class="o">{</span> @@ -343,7 +343,7 @@ <span class="o">}</span> <span class="nd">@Override</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="n">declareOutputFields</span><span class="o">(</span><span class="n">OutputFieldsDeclarer</span> <span class="n">declarer</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">declareOutputFields</span><span class="o">(</span><span class="n">OutputFieldsDeclarer</span> <span class="n">declarer</span><span class="o">)</span> <span class="o">{</span> <span class="n">declarer</span><span class="o">.</span><span class="na">declare</span><span class="o">(</span><span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"id"</span><span class="o">,</span> <span class="s">"sum"</span><span class="o">));</span> <span class="o">}</span> <span class="o">}</span> @@ -421,7 +421,7 @@ <li><em>Number of active batches permissible at once:</em> You must set a limit to the number of batches that can be processed at once. You configure this using the "topology.max.spout.pending" config. If you don't set this config, it will default to 1.</li> </ol> -<h2 id="what-if-you-can-39-t-emit-the-same-batch-of-tuples-for-a-given-transaction-id">What if you can't emit the same batch of tuples for a given transaction id?</h2> +<h2 id="what-if-you-cant-emit-the-same-batch-of-tuples-for-a-given-transaction-id">What if you can't emit the same batch of tuples for a given transaction id?</h2> <p>So far the discussion around transactional topologies has assumed that you can always emit the exact same batch of tuples for the same transaction id. So what do you do if this is not possible?</p> http://git-wip-us.apache.org/repos/asf/storm-site/blob/a7a19afb/content/releases/0.9.6/Trident-API-Overview.html ---------------------------------------------------------------------- diff --git a/content/releases/0.9.6/Trident-API-Overview.html b/content/releases/0.9.6/Trident-API-Overview.html index c42406d..71ab7c4 100644 --- a/content/releases/0.9.6/Trident-API-Overview.html +++ b/content/releases/0.9.6/Trident-API-Overview.html @@ -82,18 +82,16 @@ - <li><a href="/releases/1.1.0/index.html">1.1.0</a></li> - + <li><a href="/releases/1.1.0/index.html">1.1.0</a></li> + <li><a href="/releases/1.0.4/index.html">1.0.4</a></li> - - <li><a href="/releases/1.0.3/index.html">1.0.3</a></li> @@ -104,6 +102,8 @@ + + <li><a href="/releases/0.10.2/index.html">0.10.2</a></li> @@ -144,7 +144,7 @@ <li><a href="/contribute/BYLAWS.html">ByLaws</a></li> </ul> </li> - <li><a href="/2017/09/15/storm105-released.html" id="news">News</a></li> + <li><a href="/2018/02/14/storm106-released.html" id="news">News</a></li> </ul> </nav> </div> @@ -182,7 +182,7 @@ <p>A function takes in a set of input fields and emits zero or more tuples as output. The fields of the output tuple are appended to the original input tuple in the stream. If a function emits no tuples, the original input tuple is filtered out. Otherwise, the input tuple is duplicated for each output tuple. Suppose you have this function:</p> <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyFunction</span> <span class="kd">extends</span> <span class="n">BaseFunction</span> <span class="o">{</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="n">execute</span><span class="o">(</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">,</span> <span class="n">TridentCollector</span> <span class="n">collector</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">execute</span><span class="o">(</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">,</span> <span class="n">TridentCollector</span> <span class="n">collector</span><span class="o">)</span> <span class="o">{</span> <span class="k">for</span><span class="o">(</span><span class="kt">int</span> <span class="n">i</span><span class="o">=</span><span class="mi">0</span><span class="o">;</span> <span class="n">i</span> <span class="o"><</span> <span class="n">tuple</span><span class="o">.</span><span class="na">getInteger</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span> <span class="n">i</span><span class="o">++)</span> <span class="o">{</span> <span class="n">collector</span><span class="o">.</span><span class="na">emit</span><span class="o">(</span><span class="k">new</span> <span class="n">Values</span><span class="o">(</span><span class="n">i</span><span class="o">));</span> <span class="o">}</span> @@ -206,7 +206,7 @@ <p>Filters take in a tuple as input and decide whether or not to keep that tuple or not. Suppose you had this filter:</p> <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyFilter</span> <span class="kd">extends</span> <span class="n">BaseFunction</span> <span class="o">{</span> - <span class="kd">public</span> <span class="kt">boolean</span> <span class="n">isKeep</span><span class="o">(</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">isKeep</span><span class="o">(</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">tuple</span><span class="o">.</span><span class="na">getInteger</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="o">==</span> <span class="mi">1</span> <span class="o">&&</span> <span class="n">tuple</span><span class="o">.</span><span class="na">getInteger</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span> <span class="o">==</span> <span class="mi">2</span><span class="o">;</span> <span class="o">}</span> <span class="o">}</span> @@ -255,22 +255,22 @@ Partition 2: <p>Here's the interface for CombinerAggregator:</p> <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">CombinerAggregator</span><span class="o"><</span><span class="n">T</span><span class="o">></span> <span class="kd">extends</span> <span class="n">Serializable</span> <span class="o">{</span> - <span class="n">T</span> <span class="n">init</span><span class="o">(</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">);</span> - <span class="n">T</span> <span class="n">combine</span><span class="o">(</span><span class="n">T</span> <span class="n">val1</span><span class="o">,</span> <span class="n">T</span> <span class="n">val2</span><span class="o">);</span> - <span class="n">T</span> <span class="n">zero</span><span class="o">();</span> + <span class="n">T</span> <span class="nf">init</span><span class="o">(</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">);</span> + <span class="n">T</span> <span class="nf">combine</span><span class="o">(</span><span class="n">T</span> <span class="n">val1</span><span class="o">,</span> <span class="n">T</span> <span class="n">val2</span><span class="o">);</span> + <span class="n">T</span> <span class="nf">zero</span><span class="o">();</span> <span class="o">}</span> </code></pre></div> <p>A CombinerAggregator returns a single tuple with a single field as output. CombinerAggregators run the init function on each input tuple and use the combine function to combine values until there's only one value left. If there's no tuples in the partition, the CombinerAggregator emits the output of the zero function. For example, here's the implementation of Count:</p> <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">Count</span> <span class="kd">implements</span> <span class="n">CombinerAggregator</span><span class="o"><</span><span class="n">Long</span><span class="o">></span> <span class="o">{</span> - <span class="kd">public</span> <span class="n">Long</span> <span class="n">init</span><span class="o">(</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="n">Long</span> <span class="nf">init</span><span class="o">(</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="mi">1L</span><span class="o">;</span> <span class="o">}</span> - <span class="kd">public</span> <span class="n">Long</span> <span class="n">combine</span><span class="o">(</span><span class="n">Long</span> <span class="n">val1</span><span class="o">,</span> <span class="n">Long</span> <span class="n">val2</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="n">Long</span> <span class="nf">combine</span><span class="o">(</span><span class="n">Long</span> <span class="n">val1</span><span class="o">,</span> <span class="n">Long</span> <span class="n">val2</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">val1</span> <span class="o">+</span> <span class="n">val2</span><span class="o">;</span> <span class="o">}</span> - <span class="kd">public</span> <span class="n">Long</span> <span class="n">zero</span><span class="o">()</span> <span class="o">{</span> + <span class="kd">public</span> <span class="n">Long</span> <span class="nf">zero</span><span class="o">()</span> <span class="o">{</span> <span class="k">return</span> <span class="mi">0L</span><span class="o">;</span> <span class="o">}</span> <span class="o">}</span> @@ -279,17 +279,17 @@ Partition 2: <p>A ReducerAggregator has the following interface:</p> <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">ReducerAggregator</span><span class="o"><</span><span class="n">T</span><span class="o">></span> <span class="kd">extends</span> <span class="n">Serializable</span> <span class="o">{</span> - <span class="n">T</span> <span class="n">init</span><span class="o">();</span> - <span class="n">T</span> <span class="n">reduce</span><span class="o">(</span><span class="n">T</span> <span class="n">curr</span><span class="o">,</span> <span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">);</span> + <span class="n">T</span> <span class="nf">init</span><span class="o">();</span> + <span class="n">T</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">T</span> <span class="n">curr</span><span class="o">,</span> <span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">);</span> <span class="o">}</span> </code></pre></div> <p>A ReducerAggregator produces an initial value with init, and then it iterates on that value for each input tuple to produce a single tuple with a single value as output. For example, here's how you would define Count as a ReducerAggregator:</p> <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">Count</span> <span class="kd">implements</span> <span class="n">ReducerAggregator</span><span class="o"><</span><span class="n">Long</span><span class="o">></span> <span class="o">{</span> - <span class="kd">public</span> <span class="n">Long</span> <span class="n">init</span><span class="o">()</span> <span class="o">{</span> + <span class="kd">public</span> <span class="n">Long</span> <span class="nf">init</span><span class="o">()</span> <span class="o">{</span> <span class="k">return</span> <span class="mi">0L</span><span class="o">;</span> <span class="o">}</span> - <span class="kd">public</span> <span class="n">Long</span> <span class="n">reduce</span><span class="o">(</span><span class="n">Long</span> <span class="n">curr</span><span class="o">,</span> <span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="n">Long</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">Long</span> <span class="n">curr</span><span class="o">,</span> <span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">curr</span> <span class="o">+</span> <span class="mi">1</span><span class="o">;</span> <span class="o">}</span> <span class="o">}</span> @@ -298,9 +298,9 @@ Partition 2: <p>The most general interface for performing aggregations is Aggregator, which looks like this:</p> <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">Aggregator</span><span class="o"><</span><span class="n">T</span><span class="o">></span> <span class="kd">extends</span> <span class="n">Operation</span> <span class="o">{</span> - <span class="n">T</span> <span class="n">init</span><span class="o">(</span><span class="n">Object</span> <span class="n">batchId</span><span class="o">,</span> <span class="n">TridentCollector</span> <span class="n">collector</span><span class="o">);</span> - <span class="kt">void</span> <span class="n">aggregate</span><span class="o">(</span><span class="n">T</span> <span class="n">state</span><span class="o">,</span> <span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">,</span> <span class="n">TridentCollector</span> <span class="n">collector</span><span class="o">);</span> - <span class="kt">void</span> <span class="n">complete</span><span class="o">(</span><span class="n">T</span> <span class="n">state</span><span class="o">,</span> <span class="n">TridentCollector</span> <span class="n">collector</span><span class="o">);</span> + <span class="n">T</span> <span class="nf">init</span><span class="o">(</span><span class="n">Object</span> <span class="n">batchId</span><span class="o">,</span> <span class="n">TridentCollector</span> <span class="n">collector</span><span class="o">);</span> + <span class="kt">void</span> <span class="nf">aggregate</span><span class="o">(</span><span class="n">T</span> <span class="n">state</span><span class="o">,</span> <span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">,</span> <span class="n">TridentCollector</span> <span class="n">collector</span><span class="o">);</span> + <span class="kt">void</span> <span class="nf">complete</span><span class="o">(</span><span class="n">T</span> <span class="n">state</span><span class="o">,</span> <span class="n">TridentCollector</span> <span class="n">collector</span><span class="o">);</span> <span class="o">}</span> </code></pre></div> <p>Aggregators can emit any number of tuples with any number of fields. They can emit tuples at any point during execution. Aggregators execute in the following way:</p> @@ -317,15 +317,15 @@ Partition 2: <span class="kt">long</span> <span class="n">count</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> <span class="o">}</span> - <span class="kd">public</span> <span class="n">CountState</span> <span class="n">init</span><span class="o">(</span><span class="n">Object</span> <span class="n">batchId</span><span class="o">,</span> <span class="n">TridentCollector</span> <span class="n">collector</span><span class="o">)</span> <span class="o">{</span> - <span class="k">return</span> <span class="k">new</span> <span class="n">CountState</span><span class="o">();</span> + <span class="kd">public</span> <span class="n">CountState</span> <span class="nf">init</span><span class="o">(</span><span class="n">Object</span> <span class="n">batchId</span><span class="o">,</span> <span class="n">TridentCollector</span> <span class="n">collector</span><span class="o">)</span> <span class="o">{</span> + <span class="k">return</span> <span class="k">new</span> <span class="nf">CountState</span><span class="o">();</span> <span class="o">}</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="n">aggregate</span><span class="o">(</span><span class="n">CountState</span> <span class="n">state</span><span class="o">,</span> <span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">,</span> <span class="n">TridentCollector</span> <span class="n">collector</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">aggregate</span><span class="o">(</span><span class="n">CountState</span> <span class="n">state</span><span class="o">,</span> <span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">,</span> <span class="n">TridentCollector</span> <span class="n">collector</span><span class="o">)</span> <span class="o">{</span> <span class="n">state</span><span class="o">.</span><span class="na">count</span><span class="o">+=</span><span class="mi">1</span><span class="o">;</span> <span class="o">}</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="n">complete</span><span class="o">(</span><span class="n">CountState</span> <span class="n">state</span><span class="o">,</span> <span class="n">TridentCollector</span> <span class="n">collector</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">complete</span><span class="o">(</span><span class="n">CountState</span> <span class="n">state</span><span class="o">,</span> <span class="n">TridentCollector</span> <span class="n">collector</span><span class="o">)</span> <span class="o">{</span> <span class="n">collector</span><span class="o">.</span><span class="na">emit</span><span class="o">(</span><span class="k">new</span> <span class="n">Values</span><span class="o">(</span><span class="n">state</span><span class="o">.</span><span class="na">count</span><span class="o">));</span> <span class="o">}</span> <span class="o">}</span> http://git-wip-us.apache.org/repos/asf/storm-site/blob/a7a19afb/content/releases/0.9.6/Trident-spouts.html ---------------------------------------------------------------------- diff --git a/content/releases/0.9.6/Trident-spouts.html b/content/releases/0.9.6/Trident-spouts.html index 4a8faaf..dbe8eb4 100644 --- a/content/releases/0.9.6/Trident-spouts.html +++ b/content/releases/0.9.6/Trident-spouts.html @@ -82,18 +82,16 @@ - <li><a href="/releases/1.1.0/index.html">1.1.0</a></li> - + <li><a href="/releases/1.1.0/index.html">1.1.0</a></li> + <li><a href="/releases/1.0.4/index.html">1.0.4</a></li> - - <li><a href="/releases/1.0.3/index.html">1.0.3</a></li> @@ -104,6 +102,8 @@ + + <li><a href="/releases/0.10.2/index.html">0.10.2</a></li> @@ -144,7 +144,7 @@ <li><a href="/contribute/BYLAWS.html">ByLaws</a></li> </ul> </li> - <li><a href="/2017/09/15/storm105-released.html" id="news">News</a></li> + <li><a href="/2018/02/14/storm106-released.html" id="news">News</a></li> </ul> </nav> </div> http://git-wip-us.apache.org/repos/asf/storm-site/blob/a7a19afb/content/releases/0.9.6/Trident-state.html ---------------------------------------------------------------------- diff --git a/content/releases/0.9.6/Trident-state.html b/content/releases/0.9.6/Trident-state.html index 6cb78ca..e0f9f31 100644 --- a/content/releases/0.9.6/Trident-state.html +++ b/content/releases/0.9.6/Trident-state.html @@ -82,18 +82,16 @@ - <li><a href="/releases/1.1.0/index.html">1.1.0</a></li> - + <li><a href="/releases/1.1.0/index.html">1.1.0</a></li> + <li><a href="/releases/1.0.4/index.html">1.0.4</a></li> - - <li><a href="/releases/1.0.3/index.html">1.0.3</a></li> @@ -104,6 +102,8 @@ + + <li><a href="/releases/0.10.2/index.html">0.10.2</a></li> @@ -144,7 +144,7 @@ <li><a href="/contribute/BYLAWS.html">ByLaws</a></li> </ul> </li> - <li><a href="/2017/09/15/storm105-released.html" id="news">News</a></li> + <li><a href="/2018/02/14/storm106-released.html" id="news">News</a></li> </ul> </nav> </div> @@ -232,23 +232,23 @@ apple => [count=10, txid=2] <p>With opaque transactional spouts, it's no longer possible to use the trick of skipping state updates if the transaction id in the database is the same as the transaction id for the current batch. This is because the batch may have changed between state updates.</p> <p>What you can do is store more state in the database. Rather than store a value and transaction id in the database, you instead store a value, transaction id, and the previous value in the database. Let's again use the example of storing a count in the database. Suppose the partial count for your batch is "2" and it's time to apply a state update. Suppose the value in the database looks like this:</p> -<div class="highlight"><pre><code class="language-" data-lang=""><span class="p">{</span><span class="w"> </span><span class="err">value</span><span class="w"> </span><span class="err">=</span><span class="w"> </span><span class="err">4,</span><span class="w"> - </span><span class="err">prevValue</span><span class="w"> </span><span class="err">=</span><span class="w"> </span><span class="err">1,</span><span class="w"> - </span><span class="err">txid</span><span class="w"> </span><span class="err">=</span><span class="w"> </span><span class="err">2</span><span class="w"> -</span><span class="p">}</span><span class="w"> -</span></code></pre></div> +<div class="highlight"><pre><code class="language-" data-lang="">{ value = 4, + prevValue = 1, + txid = 2 +} +</code></pre></div> <p>Suppose your current txid is 3, different than what's in the database. In this case, you set "prevValue" equal to "value", increment "value" by your partial count, and update the txid. The new database value will look like this:</p> -<div class="highlight"><pre><code class="language-" data-lang=""><span class="p">{</span><span class="w"> </span><span class="err">value</span><span class="w"> </span><span class="err">=</span><span class="w"> </span><span class="err">6,</span><span class="w"> - </span><span class="err">prevValue</span><span class="w"> </span><span class="err">=</span><span class="w"> </span><span class="err">4,</span><span class="w"> - </span><span class="err">txid</span><span class="w"> </span><span class="err">=</span><span class="w"> </span><span class="err">3</span><span class="w"> -</span><span class="p">}</span><span class="w"> -</span></code></pre></div> +<div class="highlight"><pre><code class="language-" data-lang="">{ value = 6, + prevValue = 4, + txid = 3 +} +</code></pre></div> <p>Now suppose your current txid is 2, equal to what's in the database. Now you know that the "value" in the database contains an update from a previous batch for your current txid, but that batch may have been different so you have to ignore it. What you do in this case is increment "prevValue" by your partial count to compute the new "value". You then set the value in the database to this:</p> -<div class="highlight"><pre><code class="language-" data-lang=""><span class="p">{</span><span class="w"> </span><span class="err">value</span><span class="w"> </span><span class="err">=</span><span class="w"> </span><span class="err">3,</span><span class="w"> - </span><span class="err">prevValue</span><span class="w"> </span><span class="err">=</span><span class="w"> </span><span class="err">1,</span><span class="w"> - </span><span class="err">txid</span><span class="w"> </span><span class="err">=</span><span class="w"> </span><span class="err">2</span><span class="w"> -</span><span class="p">}</span><span class="w"> -</span></code></pre></div> +<div class="highlight"><pre><code class="language-" data-lang="">{ value = 3, + prevValue = 1, + txid = 2 +} +</code></pre></div> <p>This works because of the strong ordering of batches provided by Trident. Once Trident moves onto a new batch for state updates, it will never go back to a previous batch. And since opaque transactional spouts guarantee no overlap between batches â that each tuple is successfully processed by one batch â you can safely update based on the previous value.</p> <h2 id="non-transactional-spouts">Non-transactional spouts</h2> @@ -280,33 +280,33 @@ apple => [count=10, txid=2] <p>The base State interface just has two methods:</p> <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">State</span> <span class="o">{</span> - <span class="kt">void</span> <span class="n">beginCommit</span><span class="o">(</span><span class="n">Long</span> <span class="n">txid</span><span class="o">);</span> <span class="c1">// can be null for things like partitionPersist occurring off a DRPC stream</span> - <span class="kt">void</span> <span class="n">commit</span><span class="o">(</span><span class="n">Long</span> <span class="n">txid</span><span class="o">);</span> + <span class="kt">void</span> <span class="nf">beginCommit</span><span class="o">(</span><span class="n">Long</span> <span class="n">txid</span><span class="o">);</span> <span class="c1">// can be null for things like partitionPersist occurring off a DRPC stream</span> + <span class="kt">void</span> <span class="nf">commit</span><span class="o">(</span><span class="n">Long</span> <span class="n">txid</span><span class="o">);</span> <span class="o">}</span> </code></pre></div> <p>You're told when a state update is beginning, when a state update is ending, and you're given the txid in each case. Trident assumes nothing about how your state works, what kind of methods there are to update it, and what kind of methods there are to read from it.</p> <p>Suppose you have a home-grown database that contains user location information and you want to be able to access it from Trident. Your State implementation would have methods for getting and setting user information:</p> <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">LocationDB</span> <span class="kd">implements</span> <span class="n">State</span> <span class="o">{</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="n">beginCommit</span><span class="o">(</span><span class="n">Long</span> <span class="n">txid</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">beginCommit</span><span class="o">(</span><span class="n">Long</span> <span class="n">txid</span><span class="o">)</span> <span class="o">{</span> <span class="o">}</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="n">commit</span><span class="o">(</span><span class="n">Long</span> <span class="n">txid</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">commit</span><span class="o">(</span><span class="n">Long</span> <span class="n">txid</span><span class="o">)</span> <span class="o">{</span> <span class="o">}</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="n">setLocation</span><span class="o">(</span><span class="kt">long</span> <span class="n">userId</span><span class="o">,</span> <span class="n">String</span> <span class="n">location</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">setLocation</span><span class="o">(</span><span class="kt">long</span> <span class="n">userId</span><span class="o">,</span> <span class="n">String</span> <span class="n">location</span><span class="o">)</span> <span class="o">{</span> <span class="c1">// code to access database and set location</span> <span class="o">}</span> - <span class="kd">public</span> <span class="n">String</span> <span class="n">getLocation</span><span class="o">(</span><span class="kt">long</span> <span class="n">userId</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="n">String</span> <span class="nf">getLocation</span><span class="o">(</span><span class="kt">long</span> <span class="n">userId</span><span class="o">)</span> <span class="o">{</span> <span class="c1">// code to get location from database</span> <span class="o">}</span> <span class="o">}</span> </code></pre></div> <p>You then provide Trident a StateFactory that can create instances of your State object within Trident tasks. The StateFactory for your LocationDB might look something like this:</p> <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">LocationDBFactory</span> <span class="kd">implements</span> <span class="n">StateFactory</span> <span class="o">{</span> - <span class="kd">public</span> <span class="n">State</span> <span class="n">makeState</span><span class="o">(</span><span class="n">Map</span> <span class="n">conf</span><span class="o">,</span> <span class="kt">int</span> <span class="n">partitionIndex</span><span class="o">,</span> <span class="kt">int</span> <span class="n">numPartitions</span><span class="o">)</span> <span class="o">{</span> - <span class="k">return</span> <span class="k">new</span> <span class="n">LocationDB</span><span class="o">();</span> + <span class="kd">public</span> <span class="n">State</span> <span class="nf">makeState</span><span class="o">(</span><span class="n">Map</span> <span class="n">conf</span><span class="o">,</span> <span class="kt">int</span> <span class="n">partitionIndex</span><span class="o">,</span> <span class="kt">int</span> <span class="n">numPartitions</span><span class="o">)</span> <span class="o">{</span> + <span class="k">return</span> <span class="k">new</span> <span class="nf">LocationDB</span><span class="o">();</span> <span class="o">}</span> <span class="o">}</span> </code></pre></div> @@ -318,7 +318,7 @@ apple => [count=10, txid=2] </code></pre></div> <p>Now let's take a look at what the implementation of QueryLocation would look like:</p> <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">QueryLocation</span> <span class="kd">extends</span> <span class="n">BaseQueryFunction</span><span class="o"><</span><span class="n">LocationDB</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="o">{</span> - <span class="kd">public</span> <span class="n">List</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">batchRetrieve</span><span class="o">(</span><span class="n">LocationDB</span> <span class="n">state</span><span class="o">,</span> <span class="n">List</span><span class="o"><</span><span class="n">TridentTuple</span><span class="o">></span> <span class="n">inputs</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="n">List</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="nf">batchRetrieve</span><span class="o">(</span><span class="n">LocationDB</span> <span class="n">state</span><span class="o">,</span> <span class="n">List</span><span class="o"><</span><span class="n">TridentTuple</span><span class="o">></span> <span class="n">inputs</span><span class="o">)</span> <span class="o">{</span> <span class="n">List</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">ret</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o">();</span> <span class="k">for</span><span class="o">(</span><span class="n">TridentTuple</span> <span class="nl">input:</span> <span class="n">inputs</span><span class="o">)</span> <span class="o">{</span> <span class="n">ret</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">state</span><span class="o">.</span><span class="na">getLocation</span><span class="o">(</span><span class="n">input</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">)));</span> @@ -326,7 +326,7 @@ apple => [count=10, txid=2] <span class="k">return</span> <span class="n">ret</span><span class="o">;</span> <span class="o">}</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="n">execute</span><span class="o">(</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">,</span> <span class="n">String</span> <span class="n">location</span><span class="o">,</span> <span class="n">TridentCollector</span> <span class="n">collector</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">execute</span><span class="o">(</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">,</span> <span class="n">String</span> <span class="n">location</span><span class="o">,</span> <span class="n">TridentCollector</span> <span class="n">collector</span><span class="o">)</span> <span class="o">{</span> <span class="n">collector</span><span class="o">.</span><span class="na">emit</span><span class="o">(</span><span class="k">new</span> <span class="n">Values</span><span class="o">(</span><span class="n">location</span><span class="o">));</span> <span class="o">}</span> <span class="o">}</span> @@ -335,24 +335,24 @@ apple => [count=10, txid=2] <p>You can see that this code doesn't take advantage of the batching that Trident does, since it just queries the LocationDB one at a time. So a better way to write the LocationDB would be like this:</p> <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">LocationDB</span> <span class="kd">implements</span> <span class="n">State</span> <span class="o">{</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="n">beginCommit</span><span class="o">(</span><span class="n">Long</span> <span class="n">txid</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">beginCommit</span><span class="o">(</span><span class="n">Long</span> <span class="n">txid</span><span class="o">)</span> <span class="o">{</span> <span class="o">}</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="n">commit</span><span class="o">(</span><span class="n">Long</span> <span class="n">txid</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">commit</span><span class="o">(</span><span class="n">Long</span> <span class="n">txid</span><span class="o">)</span> <span class="o">{</span> <span class="o">}</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="n">setLocationsBulk</span><span class="o">(</span><span class="n">List</span><span class="o"><</span><span class="n">Long</span><span class="o">></span> <span class="n">userIds</span><span class="o">,</span> <span class="n">List</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">locations</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">setLocationsBulk</span><span class="o">(</span><span class="n">List</span><span class="o"><</span><span class="n">Long</span><span class="o">></span> <span class="n">userIds</span><span class="o">,</span> <span class="n">List</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">locations</span><span class="o">)</span> <span class="o">{</span> <span class="c1">// set locations in bulk</span> <span class="o">}</span> - <span class="kd">public</span> <span class="n">List</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">bulkGetLocations</span><span class="o">(</span><span class="n">List</span><span class="o"><</span><span class="n">Long</span><span class="o">></span> <span class="n">userIds</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="n">List</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="nf">bulkGetLocations</span><span class="o">(</span><span class="n">List</span><span class="o"><</span><span class="n">Long</span><span class="o">></span> <span class="n">userIds</span><span class="o">)</span> <span class="o">{</span> <span class="c1">// get locations in bulk</span> <span class="o">}</span> <span class="o">}</span> </code></pre></div> <p>Then, you can write the QueryLocation function like this:</p> <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">QueryLocation</span> <span class="kd">extends</span> <span class="n">BaseQueryFunction</span><span class="o"><</span><span class="n">LocationDB</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="o">{</span> - <span class="kd">public</span> <span class="n">List</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">batchRetrieve</span><span class="o">(</span><span class="n">LocationDB</span> <span class="n">state</span><span class="o">,</span> <span class="n">List</span><span class="o"><</span><span class="n">TridentTuple</span><span class="o">></span> <span class="n">inputs</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="n">List</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="nf">batchRetrieve</span><span class="o">(</span><span class="n">LocationDB</span> <span class="n">state</span><span class="o">,</span> <span class="n">List</span><span class="o"><</span><span class="n">TridentTuple</span><span class="o">></span> <span class="n">inputs</span><span class="o">)</span> <span class="o">{</span> <span class="n">List</span><span class="o"><</span><span class="n">Long</span><span class="o">></span> <span class="n">userIds</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o"><</span><span class="n">Long</span><span class="o">>();</span> <span class="k">for</span><span class="o">(</span><span class="n">TridentTuple</span> <span class="nl">input:</span> <span class="n">inputs</span><span class="o">)</span> <span class="o">{</span> <span class="n">userIds</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">input</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">));</span> @@ -360,7 +360,7 @@ apple => [count=10, txid=2] <span class="k">return</span> <span class="n">state</span><span class="o">.</span><span class="na">bulkGetLocations</span><span class="o">(</span><span class="n">userIds</span><span class="o">);</span> <span class="o">}</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="n">execute</span><span class="o">(</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">,</span> <span class="n">String</span> <span class="n">location</span><span class="o">,</span> <span class="n">TridentCollector</span> <span class="n">collector</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">execute</span><span class="o">(</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">,</span> <span class="n">String</span> <span class="n">location</span><span class="o">,</span> <span class="n">TridentCollector</span> <span class="n">collector</span><span class="o">)</span> <span class="o">{</span> <span class="n">collector</span><span class="o">.</span><span class="na">emit</span><span class="o">(</span><span class="k">new</span> <span class="n">Values</span><span class="o">(</span><span class="n">location</span><span class="o">));</span> <span class="o">}</span> <span class="o">}</span> @@ -369,7 +369,7 @@ apple => [count=10, txid=2] <p>To update state, you make use of the StateUpdater interface. Here's a StateUpdater that updates a LocationDB with new location information:</p> <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">LocationUpdater</span> <span class="kd">extends</span> <span class="n">BaseStateUpdater</span><span class="o"><</span><span class="n">LocationDB</span><span class="o">></span> <span class="o">{</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="n">updateState</span><span class="o">(</span><span class="n">LocationDB</span> <span class="n">state</span><span class="o">,</span> <span class="n">List</span><span class="o"><</span><span class="n">TridentTuple</span><span class="o">></span> <span class="n">tuples</span><span class="o">,</span> <span class="n">TridentCollector</span> <span class="n">collector</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">updateState</span><span class="o">(</span><span class="n">LocationDB</span> <span class="n">state</span><span class="o">,</span> <span class="n">List</span><span class="o"><</span><span class="n">TridentTuple</span><span class="o">></span> <span class="n">tuples</span><span class="o">,</span> <span class="n">TridentCollector</span> <span class="n">collector</span><span class="o">)</span> <span class="o">{</span> <span class="n">List</span><span class="o"><</span><span class="n">Long</span><span class="o">></span> <span class="n">ids</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o"><</span><span class="n">Long</span><span class="o">>();</span> <span class="n">List</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">locations</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o"><</span><span class="n">String</span><span class="o">>();</span> <span class="k">for</span><span class="o">(</span><span class="n">TridentTuple</span> <span class="nl">t:</span> <span class="n">tuples</span><span class="o">)</span> <span class="o">{</span> @@ -404,16 +404,16 @@ apple => [count=10, txid=2] </code></pre></div> <p>persistentAggregate is an additional abstraction built on top of partitionPersist that knows how to take a Trident aggregator and use it to apply updates to the source of state. In this case, since this is a grouped stream, Trident expects the state you provide to implement the "MapState" interface. The grouping fields will be the keys in the state, and the aggregation result will be the values in the state. The "MapState" interface looks like this:</p> <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">MapState</span><span class="o"><</span><span class="n">T</span><span class="o">></span> <span class="kd">extends</span> <span class="n">State</span> <span class="o">{</span> - <span class="n">List</span><span class="o"><</span><span class="n">T</span><span class="o">></span> <span class="n">multiGet</span><span class="o">(</span><span class="n">List</span><span class="o"><</span><span class="n">List</span><span class="o"><</span><span class="n">Object</span><span class="o">>></span> <span class="n">keys</span><span class="o">);</span> - <span class="n">List</span><span class="o"><</span><span class="n">T</span><span class="o">></span> <span class="n">multiUpdate</span><span class="o">(</span><span class="n">List</span><span class="o"><</span><span class="n">List</span><span class="o"><</span><span class="n">Object</span><span class="o">>></span> <span class="n">keys</span><span class="o">,</span> <span class="n">List</span><span class="o"><</span><span class="n">ValueUpdater</span><span class="o">></span> <span class="n">updaters</span><span class="o">);</span> - <span class="kt">void</span> <span class="n">multiPut</span><span class="o">(</span><span class="n">List</span><span class="o"><</span><span class="n">List</span><span class="o"><</span><span class="n">Object</span><span class="o">>></span> <span class="n">keys</span><span class="o">,</span> <span class="n">List</span><span class="o"><</span><span class="n">T</span><span class="o">></span> <span class="n">vals</span><span class="o">);</span> + <span class="n">List</span><span class="o"><</span><span class="n">T</span><span class="o">></span> <span class="nf">multiGet</span><span class="o">(</span><span class="n">List</span><span class="o"><</span><span class="n">List</span><span class="o"><</span><span class="n">Object</span><span class="o">>></span> <span class="n">keys</span><span class="o">);</span> + <span class="n">List</span><span class="o"><</span><span class="n">T</span><span class="o">></span> <span class="nf">multiUpdate</span><span class="o">(</span><span class="n">List</span><span class="o"><</span><span class="n">List</span><span class="o"><</span><span class="n">Object</span><span class="o">>></span> <span class="n">keys</span><span class="o">,</span> <span class="n">List</span><span class="o"><</span><span class="n">ValueUpdater</span><span class="o">></span> <span class="n">updaters</span><span class="o">);</span> + <span class="kt">void</span> <span class="nf">multiPut</span><span class="o">(</span><span class="n">List</span><span class="o"><</span><span class="n">List</span><span class="o"><</span><span class="n">Object</span><span class="o">>></span> <span class="n">keys</span><span class="o">,</span> <span class="n">List</span><span class="o"><</span><span class="n">T</span><span class="o">></span> <span class="n">vals</span><span class="o">);</span> <span class="o">}</span> </code></pre></div> <p>When you do aggregations on non-grouped streams (a global aggregation), Trident expects your State object to implement the "Snapshottable" interface:</p> <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">Snapshottable</span><span class="o"><</span><span class="n">T</span><span class="o">></span> <span class="kd">extends</span> <span class="n">State</span> <span class="o">{</span> - <span class="n">T</span> <span class="n">get</span><span class="o">();</span> - <span class="n">T</span> <span class="n">update</span><span class="o">(</span><span class="n">ValueUpdater</span> <span class="n">updater</span><span class="o">);</span> - <span class="kt">void</span> <span class="n">set</span><span class="o">(</span><span class="n">T</span> <span class="n">o</span><span class="o">);</span> + <span class="n">T</span> <span class="nf">get</span><span class="o">();</span> + <span class="n">T</span> <span class="nf">update</span><span class="o">(</span><span class="n">ValueUpdater</span> <span class="n">updater</span><span class="o">);</span> + <span class="kt">void</span> <span class="nf">set</span><span class="o">(</span><span class="n">T</span> <span class="n">o</span><span class="o">);</span> <span class="o">}</span> </code></pre></div> <p><a href="https://github.com/apache/incubator-storm/blob/0.9.6/storm-core/src/jvm/storm/trident/testing/MemoryMapState.java">MemoryMapState</a> and <a href="https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java">MemcachedState</a> each implement both of these interfaces.</p> @@ -422,8 +422,8 @@ apple => [count=10, txid=2] <p>Trident makes it easy to implement MapState's, doing almost all the work for you. The OpaqueMap, TransactionalMap, and NonTransactionalMap classes implement all the logic for doing the respective fault-tolerance logic. You simply provide these classes with an IBackingMap implementation that knows how to do multiGets and multiPuts of the respective key/values. IBackingMap looks like this:</p> <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">IBackingMap</span><span class="o"><</span><span class="n">T</span><span class="o">></span> <span class="o">{</span> - <span class="n">List</span><span class="o"><</span><span class="n">T</span><span class="o">></span> <span class="n">multiGet</span><span class="o">(</span><span class="n">List</span><span class="o"><</span><span class="n">List</span><span class="o"><</span><span class="n">Object</span><span class="o">>></span> <span class="n">keys</span><span class="o">);</span> - <span class="kt">void</span> <span class="n">multiPut</span><span class="o">(</span><span class="n">List</span><span class="o"><</span><span class="n">List</span><span class="o"><</span><span class="n">Object</span><span class="o">>></span> <span class="n">keys</span><span class="o">,</span> <span class="n">List</span><span class="o"><</span><span class="n">T</span><span class="o">></span> <span class="n">vals</span><span class="o">);</span> + <span class="n">List</span><span class="o"><</span><span class="n">T</span><span class="o">></span> <span class="nf">multiGet</span><span class="o">(</span><span class="n">List</span><span class="o"><</span><span class="n">List</span><span class="o"><</span><span class="n">Object</span><span class="o">>></span> <span class="n">keys</span><span class="o">);</span> + <span class="kt">void</span> <span class="nf">multiPut</span><span class="o">(</span><span class="n">List</span><span class="o"><</span><span class="n">List</span><span class="o"><</span><span class="n">Object</span><span class="o">>></span> <span class="n">keys</span><span class="o">,</span> <span class="n">List</span><span class="o"><</span><span class="n">T</span><span class="o">></span> <span class="n">vals</span><span class="o">);</span> <span class="o">}</span> </code></pre></div> <p>OpaqueMap's will call multiPut with <a href="https://github.com/apache/incubator-storm/blob/0.9.6/storm-core/src/jvm/storm/trident/state/OpaqueValue.java">OpaqueValue</a>'s for the vals, TransactionalMap's will give <a href="https://github.com/apache/incubator-storm/blob/0.9.6/storm-core/src/jvm/storm/trident/state/TransactionalValue.java">TransactionalValue</a>'s for the vals, and NonTransactionalMaps will just pass the objects from the topology through.</p>