This is an automated email from the ASF dual-hosted git repository.

karthikz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new f444372  [Streamlet Scala API] Add Scala Streamlet Integration Tests 
Part II (#2861)
f444372 is described below

commit f444372de2749c8a6953ff98a5178550ec026cd8
Author: Eren Avsarogullari <erenavsarogull...@gmail.com>
AuthorDate: Sat Apr 14 00:37:28 2018 +0100

    [Streamlet Scala API] Add Scala Streamlet Integration Tests Part II (#2861)
    
    * Scala Streamlet Integration Tests Part II
    
    * Merge conflicts have been fixed
---
 .../src/python/test_runner/resources/test.json     |  5 ++
 .../common/ScalaIntegrationTestBase.scala          |  2 +-
 .../ScalaStreamletWithFilterAndTransform.scala     |  1 -
 ...reamletWithMapAndFlatMapAndFilterAndClone.scala | 74 ++++++++++++++++++++++
 ...tWithMapAndFlatMapAndFilterAndCloneResults.json |  1 +
 5 files changed, 81 insertions(+), 2 deletions(-)

diff --git a/integration_test/src/python/test_runner/resources/test.json 
b/integration_test/src/python/test_runner/resources/test.json
index 7d7cf2b..1174154 100644
--- a/integration_test/src/python/test_runner/resources/test.json
+++ b/integration_test/src/python/test_runner/resources/test.json
@@ -12,6 +12,11 @@
       "topologyName" : "IntegrationTest_ScalaStreamletWithFilterAndTransform",
       "classPath"    : 
"scala_streamlet_with_filter_and_transform.ScalaStreamletWithFilterAndTransform",
       "expectedResultRelativePath" : 
"scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransformResults.json"
+    },
+    {
+      "topologyName" : 
"IntegrationTest_ScalaStreamletWithMapAndFlatMapAndFilterAndClone",
+      "classPath"    : 
"scala_streamlet_with_map_and_flatmap_and_filter_and_clone.ScalaStreamletWithMapAndFlatMapAndFilterAndClone",
+      "expectedResultRelativePath" : 
"scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndCloneResults.json"
     }
   ],
   "javaTopologies": [
diff --git 
a/integration_test/src/scala/org/apache/heron/integration_test/common/ScalaIntegrationTestBase.scala
 
b/integration_test/src/scala/org/apache/heron/integration_test/common/ScalaIntegrationTestBase.scala
index 0b36d6a..915233f 100644
--- 
a/integration_test/src/scala/org/apache/heron/integration_test/common/ScalaIntegrationTestBase.scala
+++ 
b/integration_test/src/scala/org/apache/heron/integration_test/common/ScalaIntegrationTestBase.scala
@@ -21,7 +21,7 @@ import org.apache.heron.streamlet.scala.impl.BuilderImpl
 /**
   * Scala Integration Test Base
   */
-trait ScalaIntegrationTestBase extends Serializable {
+trait ScalaIntegrationTestBase {
 
   protected def build(testTopologyBuilder: TestTopologyBuilder,
                       streamletBuilder: Builder): TestTopologyBuilder = {
diff --git 
a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala
 
b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala
index 023f57e..7c1d7e8 100644
--- 
a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala
+++ 
b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala
@@ -38,7 +38,6 @@ object ScalaStreamletWithFilterAndTransform {
 /**
   * Scala Streamlet Integration Test
   */
-@SerialVersionUID(-7280407024398984674L)
 class ScalaStreamletWithFilterAndTransform(args: Array[String])
     extends AbstractTestTopology(args)
     with ScalaIntegrationTestBase {
diff --git 
a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndClone.scala
 
b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndClone.scala
new file mode 100644
index 0000000..2742b99
--- /dev/null
+++ 
b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndClone.scala
@@ -0,0 +1,74 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package 
org.apache.heron.integration_test.topology.scala_streamlet_with_map_and_flatmap_and_filter_and_clone
+
+import scala.collection.mutable.Set
+
+import org.apache.heron.api.Config
+import org.apache.heron.integration_test.common.{
+  AbstractTestTopology,
+  ScalaIntegrationTestBase
+}
+import org.apache.heron.integration_test.core.TestTopologyBuilder
+import org.apache.heron.streamlet.scala.Builder
+
+object ScalaStreamletWithMapAndFlatMapAndFilterAndClone {
+  val months = "january - february - march - april - may - june" +
+    " - july - august - september - october - november - december"
+
+  val summerMonths =
+    List("june", "july", "august")
+
+  val incomingMonths = Set[String]()
+
+  def main(args: Array[String]): Unit = {
+    val conf = new Config
+    val topology = new ScalaStreamletWithMapAndFlatMapAndFilterAndClone(args)
+    topology.submit(conf)
+  }
+}
+
+class ScalaStreamletWithMapAndFlatMapAndFilterAndClone(args: Array[String])
+    extends AbstractTestTopology(args)
+    with ScalaIntegrationTestBase {
+
+  import ScalaStreamletWithMapAndFlatMapAndFilterAndClone._
+
+  override protected def buildTopology(
+      testTopologyBuilder: TestTopologyBuilder): TestTopologyBuilder = {
+    val streamletBuilder = Builder.newBuilder
+
+    val clonedStreamlet = streamletBuilder
+      .newSource(() => months)
+      .setName("months-text")
+      .flatMap[String]((m: String) => m.split(" - "))
+      .setName("months")
+      .filter((month: String) =>
+        (summerMonths.contains(month.toLowerCase)
+          && incomingMonths.add(month.toLowerCase)))
+      .setName("summer-months")
+      .map[String]((word: String) => word.substring(0, 3))
+      .setName("summer-months-with-short-name")
+      .clone(numClones = 2)
+
+    //Returns Summer Months with Uppercase
+    clonedStreamlet(0).map[String](month => month + "_2018")
+
+    //Returns Summer Months with Uppercase
+    clonedStreamlet(1).map[String](_.toUpperCase)
+
+    build(testTopologyBuilder, streamletBuilder)
+  }
+
+}
diff --git 
a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndCloneResults.json
 
b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndCloneResults.json
new file mode 100644
index 0000000..6ececf2
--- /dev/null
+++ 
b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndCloneResults.json
@@ -0,0 +1 @@
+["AUG", "JUL", "JUN", "aug_2018", "jul_2018", "jun_2018"]
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
karth...@apache.org.

Reply via email to