This is an automated email from the ASF dual-hosted git repository.

karthikz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d417d5  Scala Streamlet API Documentation is being added (#2871)
4d417d5 is described below

commit 4d417d540d3169525e0894029299c4bc7dadf1b4
Author: Eren Avsarogullari <erenavsarogull...@gmail.com>
AuthorDate: Sun Apr 22 22:34:04 2018 +0100

    Scala Streamlet API Documentation is being added (#2871)
---
 .../docs/developers/scala/streamlet-api.mmark      | 408 +++++++++++++++++++++
 website/data/toc.yaml                              |   2 +
 2 files changed, 410 insertions(+)

diff --git a/website/content/docs/developers/scala/streamlet-api.mmark 
b/website/content/docs/developers/scala/streamlet-api.mmark
new file mode 100644
index 0000000..4476567
--- /dev/null
+++ b/website/content/docs/developers/scala/streamlet-api.mmark
@@ -0,0 +1,408 @@
+---
+title: The Heron Streamlet API for Scala
+description: Create Heron topologies in Scala using a functional programming 
style
+---
+
+{{< alert "streamlet-api-beta" >}}
+
+{{content/snippets/heron-streamlet-api.md}}
+
+## Getting started
+
+In order to use the Heron Streamlet API for Scala, you'll need to install the 
`heron-api` library.
+
+### Maven setup
+
+In order to use the `heron-api` library, add this to the `dependencies` block 
of your `pom.xml` configuration file:
+
+```xml
+<dependency>
+    <groupId>com.twitter.heron</groupId>
+    <artifactId>heron-api</artifactId>
+    <version>{{< heronVersion >}}</version>
+</dependency>
+```
+
+#### Compiling a JAR with dependencies
+
+In order to run a Scala topology created using the Heron Streamlet API in a 
Heron cluster, you'll need to package your topology as a "fat" JAR with 
dependencies included. You can use the [Maven Assembly 
Plugin](https://maven.apache.org/plugins/maven-assembly-plugin/usage.html) to 
generate JARs with dependencies. To install the plugin and add a Maven goal for 
a single JAR, add this to the `plugins` block in your `pom.xml`:
+
+```xml
+<plugin>
+    <artifactId>maven-assembly-plugin</artifactId>
+    <configuration>
+        <descriptorRefs>
+            <descriptorRef>jar-with-dependencies</descriptorRef>
+        </descriptorRefs>
+        <archive>
+            <manifest>
+                <mainClass></mainClass>
+            </manifest>
+        </archive>
+    </configuration>
+    <executions>
+        <execution>
+            <id>make-assembly</id>
+            <phase>package</phase>
+            <goals>
+                <goal>single</goal>
+            </goals>
+        </execution>
+    </executions>
+</plugin>
+```
+
+Once your `pom.xml` is properly set up, you can compile the JAR with 
dependencies using this command:
+
+```bash
+$ mvn assembly:assembly
+```
+
+By default, this will add a JAR in your project's `target` folder with the 
name `PROJECT-NAME-VERSION-jar-with-dependencies.jar`. Here's an example 
topology submission command using a compiled JAR:
+
+```bash
+$ mvn assembly:assembly
+$ heron submit local \
+  target/my-project-1.2.3-jar-with-dependencies.jar \
+  com.example.Main \
+  MyTopology arg1 arg2
+```
+
+## Streamlet API topology configuration
+
+Every Streamlet API topology needs to be configured using a `Config` object. 
Here's an example default configuration:
+
+```scala
+import com.twitter.heron.streamlet.Config
+import com.twitter.heron.streamlet.scala.Runner
+
+val topologyConfig = Config.defaultConfig()
+
+// Apply topology configuration using the topologyConfig object
+val topologyRunner = new Runner()
+topologyRunner.run("name-for-topology", topologyConfig, topologyBuilder)
+```
+
+The table below shows the configurable parameters for Heron topologies:
+
+Parameter | Default
+:---------|:-------
+[Delivery semantics](#delivery-semantics) | At most once
+Serializer | [Kryo](https://github.com/EsotericSoftware/kryo)
+Number of total container topologies | 2
+Per-container CPU | 1.0
+Per-container RAM | 100 MB
+
+Here's an example non-default configuration:
+
+```scala
+val topologyConfig = Config.newBuilder()
+        .setNumContainers(5)
+        .setPerContainerRamInGigabytes(10)
+        .setPerContainerCpu(3.5f)
+        .setDeliverySemantics(Config.DeliverySemantics.EFFECTIVELY_ONCE)
+        .setSerializer(Config.Serializer.JAVA)
+        .setUserConfig("some-key", "some-value")
+        .build()
+```
+
+### Delivery semantics
+
+You can apply [delivery semantics](../../../concepts/delivery-semantics) to a 
Streamlet API topology like this:
+
+```scala
+topologyConfig
+        .setDeliverySemantics(Config.DeliverySemantics.EFFECTIVELY_ONCE)
+```
+
+The other available options in the `DeliverySemantics` enum are `ATMOST_ONCE` 
and `ATLEAST_ONCE`.
+
+## Streamlets
+
+In the Heron Streamlet API for Scala, processing graphs consist of 
[streamlets](../../../concepts/topologies#streamlets). One or more supplier 
streamlets inject data into your graph to be processed by downstream operators.
+
+## Operations
+
+Operation | Description | Example
+:---------|:------------|:-------
+[`map`](#map-operations) | Create a new streamlet by applying the supplied 
mapping function to each element in the original streamlet | Add 1 to each 
element in a streamlet of integers
+[`flatMap`](#flatmap-operations) | Like a map operation but with the important 
difference that each element of the streamlet is flattened | Flatten a sentence 
into individual words
+[`filter`](#filter-operations) | Create a new streamlet containing only the 
elements that satisfy the supplied filtering function | Remove all 
inappropriate words from a streamlet of strings
+[`union`](#union-operations) | Unifies two streamlets into one, without 
modifying the elements of the two streamlets | Unite two different 
`Streamlet<String>`s into a single streamlet
+[`clone`](#clone-operations) | Creates any number of identical copies of a 
streamlet | Create three separate streamlets from the same source
+[`transform`](#transform-operations) | Transform a streamlet using whichever 
logic you'd like (useful for transformations that don't neatly map onto the 
available operations) |
+[`join`](#join-operations) | Create a new streamlet by combining two separate 
key-value streamlets into one on the basis of each element's key. Supported 
Join Types: Inner (as default), Outer-Left, Outer-Right and Outer | Combine 
key-value pairs listing current scores (e.g. `("h4x0r", 127)`) for each user 
into a single per-user stream
+[`reduceByKeyAndWindow`](#reduce-by-key-and-window-operations) | Produces a 
streamlet out of two separate key-value streamlets on a key, within a time 
window, and in accordance with a reduce function that you apply to all the 
accumulated values | Count the number of times a value has been encountered 
within a specified time window
+[`repartition`](#repartition-operations) | Create a new streamlet by applying 
a new parallelism level to the original streamlet | Increase the parallelism of 
a streamlet from 5 to 10
+[`toSink`](#sink-operations) | Sink operations terminate the processing graph 
by storing elements in a database, logging elements to stdout, etc. | Store 
processing graph results in an AWS Redshift table
+[`log`](#log-operations) | Logs the final results of a processing graph to 
stdout. This *must* be the last step in the graph. |
+[`consume`](#consume-operations) | Consume operations are like sink operations 
except they don't require implementing a full sink interface (consume 
operations are thus suited for simple operations like logging) | Log processing 
graph results using a custom formatting function
+
+### Map operations
+
+Map operations create a new streamlet by applying the supplied mapping 
function to each element in the original streamlet. Here's an example:
+
+```scala
+builder.newSource(() => 1)
+    .map[Int]((i: Int) => i + 12) // or .map[Int](_.+(12)) as synthetic 
function
+```
+
+In this example, a supplier streamlet emits an indefinite series of 1s. The 
`map` operation then adds 12 to each incoming element, producing a streamlet of 
13s.
+
+### FlatMap operations
+
+FlatMap operations are like `map` operations but with the important difference 
that each element of the streamlet is "flattened" into a collection type. In 
this example, a supplier streamlet emits the same sentence over and over again; 
the `flatMap` operation transforms each sentence into a Scala `List` of 
individual words:
+
+```scala
+builder.newSource(() => "I have nothing to declare but my genius")
+    .flatMap[String](_.split(" "))
+```
+
+The effect of this operation is to transform the `Streamlet[String]` into a 
`Streamlet[List[String]]`.
+
+> One of the core differences between `map` and `flatMap` operations is that 
`flatMap` operations typically transform non-collection types into collection 
types.
+
+### Filter operations
+
+Filter operations retain elements in a streamlet, while potentially excluding 
some or all elements, on the basis of a provided filtering function. Here's an 
example:
+
+```scala
+import java.util.concurrent.ThreadLocalRandom
+
+builder.newSource(() => ThreadLocalRandom.current().nextInt(1, 11))
+        .filter(_.<(7))
+```
+
+In this example, a source streamlet consisting of random integers between 1 
and 10 is modified by a filter operation that removes all streamlet elements 
that are lower than 7.
+
+### Union operations
+
+Union operations combine two streamlets of the same type into a single 
streamlet without modifying the elements. Here's an example:
+
+```scala
+val flowers = builder.newSource(() => "flower")
+val butterflies = builder.newSource(() => "butterfly")
+
+val combinedSpringStreamlet = flowers.union(butterflies)
+```
+
+Here, one streamlet is an endless series of "flowers" while the other is an 
endless series of "butterflies". The `union` operation combines them into a 
single streamlet of alternating "flowers" and "butterflies".
+
+### Clone operations
+
+Clone operations enable you to create any number of "copies" of a streamlet. 
Each of the "copy" streamlets contains all the elements of the original and can 
be manipulated just like the original streamlet. Here's an example:
+
+```scala
+import scala.util.Random
+
+val integers = builder.newSource(() => Random.nextInt(100))
+
+val copies = integers.clone(5)
+val ints1 = copies.get(0)
+val ints2 = copies.get(1)
+val ints3 = copies.get(2)
+// and so on...
+```
+
+In this example, a streamlet of random integers between 0 and 99 is split into 
5 identical streamlets.
+
+### Transform operations
+
+Transform operations are highly flexible operations that are most useful for:
+
+* operations involving state in [stateful 
topologies](../../concepts/delivery-semantics#stateful-topologies)
+* operations that don't neatly fit into the other categories or into a 
lambda-based logic
+
+Transform operations require you to implement three different methods:
+
+* A `setup` function that enables you to pass a context object to the 
operation and to specify what happens prior to the `transform` step
+* A `transform` operation that performs the desired transformation
+* A `cleanup` function that allows you to specify what happens after the 
`transform` step
+
+The context object available to a transform operation provides access to:
+
+* the current state of the topology
+* the topology's configuration
+* the name of the stream
+* the stream partition
+* the current task ID
+
+Here's a Scala example of a transform operation in a topology where a stateful 
record is kept of the number of items processed:
+
+```scala
+import com.twitter.heron.streamlet.Context
+import com.twitter.heron.streamlet.scala.SerializableTransformer
+
+class CountNumberOfItems extends SerializableTransformer[String, String] {
+    private val numberOfItems = new AtomicLong()
+
+    override def setup(context: Context): Unit = {
+      numberOfItems.incrementAndGet()
+      context.getState().put("number-of-items", numberOfItems)
+    }
+
+    override def transform(i: String, f: String => Unit): Unit = {
+      val transformedString = i.toUpperCase
+      f(transformedString)
+    }
+
+    override def cleanup(): Unit =
+      println(s"Successfully processed new state: $numberOfItems")
+  }
+```
+
+This operation does a few things:
+
+* In the `setup` method, the 
[`Context`](/api/java/com/twitter/heron/streamlet/Context.html) object is used 
to access the current state (which has the semantics of a Java `Map`). The 
current number of items processed is incremented by one and then saved as the 
new state.
+* In the `transform` method, the incoming string is transformed as UpperCase 
in some way and then "accepted" as the new value.
+* In the `cleanup` step, the current count of items processed is logged.
+
+Here's that operation within the context of a streamlet processing graph:
+
+```scala
+builder.newSource(() => "Some string over and over");
+        .transform(new CountNumberOfItems())
+        .log()
+```
+
+### Join operations
+
+> For a more in-depth conceptual discussion of joins, see the [Heron Streamlet 
API](../../../concepts/streamlet-api#join-operations) doc.
+
+Join operations unify two streamlets *on a key* (join operations thus require 
KV streamlets). Each `KeyValue` object in a streamlet has, by definition, a 
key. When a `join` operation is added to a processing graph,
+
+```scala
+import com.twitter.heron.streamlet.{Config, KeyValue, WindowConfig}
+import com.twitter.heron.streamlet.scala.Builder
+
+val builder = Builder.newBuilder()
+
+val streamlet1 = builder
+  .newSource(() =>
+    new KeyValue[String, String]("heron-api", "topology-api"))
+  .setName("streamlet1")
+
+val streamlet2 = builder
+  .newSource(() =>
+    new KeyValue[String, String]("heron-api", "streamlet-api"))
+  .setName("streamlet2")
+
+streamlet1.join[KeyValue[String, String], KeyValue[String, String], String](
+  streamlet2,
+  (kv: KeyValue[String, String]) => kv,
+  (kv: KeyValue[String, String]) => kv,
+  WindowConfig.TumblingCountWindow(10),
+  (kv1: KeyValue[String, String], kv2: KeyValue[String, String]) =>
+    kv1.getValue + " - " + kv2.getValue
+)
+```
+
+In this case, the resulting streamlet would consist of an indefinite stream 
with two `KeyValue` objects with the key `heron-api` but different values 
(`topology-api` and `streamlet-api`).
+
+> The effect of a `join` operation is to create a new streamlet *for each key*.
+
+### Reduce by key and window operations
+
+You can apply 
[reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html)
 operations to streamlets by specifying:
+
+* a key extractor that determines what counts as the key for the streamlet
+* a value extractor that determines which final value is chosen for each 
element of the streamlet
+* a [time window](../../../concepts/topologies#window-operations) across which 
the operation will take place
+* a reduce function that produces a single value for each key in the streamlet
+
+Reduce by key and window operations produce a new streamlet of key-value 
window objects (which include a key-value pair including the extracted key and 
calculated value, as well as information about the window in which the 
operation took place). Here's an example:
+
+```scala
+import com.twitter.heron.streamlet.WindowConfig;
+
+val builder = Builder.newBuilder()
+
+builder
+  .newSource(() => "Paco de Lucia is one of the most popular virtuoso")
+  // Convert each sentence into individual words
+  .flatMap[String](_.split(" "))
+  .reduceByKeyAndWindow[String, Int](
+      // Key extractor (in this case, each word acts as the key)
+      (word: String) => word,
+      // Value extractor (each word appears only once, hence the value is 
always 1)
+      (word: String) => 1,
+      // Window configuration
+      WindowConfig.TumblingCountWindow(50),
+      // Reduce operation (a running sum)
+      (x: Int, y: Int) => x + y)
+  // The result is logged
+  .log();
+```
+
+### Repartition operations
+
+When you assign a number of [partitions](#partitioning-and-parallelism) to a 
processing step, each step that comes after it inherits that number of 
partitions. Thus, if you assign 5 partitions to a `map` operation, then any 
`mapToKV`, `flatMap`, `filter`, etc. operations that come after it will also be 
assigned 5 partitions. But you can also change the number of partitions for a 
processing step (as well as the number of partitions for downstream operations) 
using `repartition`. Here's an [...]
+
+```scala
+import java.util.concurrent.ThreadLocalRandom;
+
+val builder = Builder.newBuilder
+
+val numbers = builder
+  .newSource(() => ThreadLocalRandom.current().nextInt(1, 11))
+
+numbers
+  .setNumPartitions(5)
+  .map(i => i + 1)
+  .repartition(2)
+  .filter(i => i > 7 && i < 2)
+  .log()
+```
+
+In this example, the supplier streamlet emits random integers between 1 and 
10. That operation is assigned 5 partitions. After the `map` operation, the 
`repartition` function is used to assign 2 partitions to all downstream 
operations.
+
+### Sink operations
+
+In processing graphs like the ones you build using the Heron Streamlet API, 
**sinks** are essentially the terminal points in your graph, where your 
processing logic comes to an end. A processing graph can end with writing to a 
database, publishing to a topic in a pub-sub messaging system, and so on. With 
the Streamlet API, you can implement your own custom sinks. Here's an example:
+
+```scala
+import com.twitter.heron.streamlet.Context
+import com.twitter.heron.streamlet.scala.Sink
+
+class FormattedLogSink extends Sink[String] {
+    private var streamName: Option[String] = None
+
+    override def setup(context: Context): Unit =
+      streamName = Some(context.getStreamName)
+
+    override def put(tuple: String): Unit =
+      println(s"The current value of tuple is $tuple in stream: $streamName")
+
+    override def cleanup(): Unit = {}
+  }
+```
+
+In this example, the sink fetches the name of the enclosing streamlet from the 
context passed in the `setup` method. The `put` method specifies how the sink 
handles each element that is received (in this case, a formatted message is 
logged to stdout). The `cleanup` method enables you to specify what happens 
after the element has been processed by the sink.
+
+Here is the `FormattedLogSink` at work in an example processing graph:
+
+```scala
+val builder = Builder.newBuilder
+
+builder.newSource(() => "Here is a string to be passed to the sink")
+        .toSink(new FormattedLogSink)
+```
+
+> [Log operations](#log-operations) rely on a log sink that is provided out of 
the box. You'll need to implement other sinks yourself.
+
+### Log operations
+
+Log operations are special cases of consume operations that log streamlet 
elements to stdout.
+
+> Streamlet elements will be using their `toString` representations and at the 
`INFO` level.
+
+### Consume operations
+
+Consume operations are like [sink operations](#sink-operations) except they 
don't require implementing a full sink interface. Consume operations are thus 
suited for simple operations like formatted logging. Here's an example:
+
+```scala
+val builder = Builder.newBuilder
+      .newSource(() => Random.nextInt(10))
+      .filter(i => i % 2 == 0)
+      .consume(i => println(s"Even number found: $i"))
+```
\ No newline at end of file
diff --git a/website/data/toc.yaml b/website/data/toc.yaml
index 590828a..4b381b6 100644
--- a/website/data/toc.yaml
+++ b/website/data/toc.yaml
@@ -19,6 +19,8 @@ sections:
     sublinks:
     - name: The Heron Streamlet API for Java
       url: /docs/developers/java/streamlet-api
+    - name: The Heron Streamlet API for Scala
+      url: /docs/developers/scala/streamlet-api
     - name: The Heron ECO API for Java
       url: /docs/developers/java/eco-api
     - name: The Heron Topology API for Java

-- 
To stop receiving notification emails like this one, please contact
karth...@apache.org.

Reply via email to