This is an automated email from the ASF dual-hosted git repository. karthikz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push: new f444372 [Streamlet Scala API] Add Scala Streamlet Integration Tests Part II (#2861) f444372 is described below commit f444372de2749c8a6953ff98a5178550ec026cd8 Author: Eren Avsarogullari <erenavsarogull...@gmail.com> AuthorDate: Sat Apr 14 00:37:28 2018 +0100 [Streamlet Scala API] Add Scala Streamlet Integration Tests Part II (#2861) * Scala Streamlet Integration Tests Part II * Merge conflicts have been fixed --- .../src/python/test_runner/resources/test.json | 5 ++ .../common/ScalaIntegrationTestBase.scala | 2 +- .../ScalaStreamletWithFilterAndTransform.scala | 1 - ...reamletWithMapAndFlatMapAndFilterAndClone.scala | 74 ++++++++++++++++++++++ ...tWithMapAndFlatMapAndFilterAndCloneResults.json | 1 + 5 files changed, 81 insertions(+), 2 deletions(-) diff --git a/integration_test/src/python/test_runner/resources/test.json b/integration_test/src/python/test_runner/resources/test.json index 7d7cf2b..1174154 100644 --- a/integration_test/src/python/test_runner/resources/test.json +++ b/integration_test/src/python/test_runner/resources/test.json @@ -12,6 +12,11 @@ "topologyName" : "IntegrationTest_ScalaStreamletWithFilterAndTransform", "classPath" : "scala_streamlet_with_filter_and_transform.ScalaStreamletWithFilterAndTransform", "expectedResultRelativePath" : "scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransformResults.json" + }, + { + "topologyName" : "IntegrationTest_ScalaStreamletWithMapAndFlatMapAndFilterAndClone", + "classPath" : "scala_streamlet_with_map_and_flatmap_and_filter_and_clone.ScalaStreamletWithMapAndFlatMapAndFilterAndClone", + "expectedResultRelativePath" : "scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndCloneResults.json" } ], "javaTopologies": [ diff --git a/integration_test/src/scala/org/apache/heron/integration_test/common/ScalaIntegrationTestBase.scala b/integration_test/src/scala/org/apache/heron/integration_test/common/ScalaIntegrationTestBase.scala index 0b36d6a..915233f 100644 --- a/integration_test/src/scala/org/apache/heron/integration_test/common/ScalaIntegrationTestBase.scala +++ b/integration_test/src/scala/org/apache/heron/integration_test/common/ScalaIntegrationTestBase.scala @@ -21,7 +21,7 @@ import org.apache.heron.streamlet.scala.impl.BuilderImpl /** * Scala Integration Test Base */ -trait ScalaIntegrationTestBase extends Serializable { +trait ScalaIntegrationTestBase { protected def build(testTopologyBuilder: TestTopologyBuilder, streamletBuilder: Builder): TestTopologyBuilder = { diff --git a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala index 023f57e..7c1d7e8 100644 --- a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala +++ b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala @@ -38,7 +38,6 @@ object ScalaStreamletWithFilterAndTransform { /** * Scala Streamlet Integration Test */ -@SerialVersionUID(-7280407024398984674L) class ScalaStreamletWithFilterAndTransform(args: Array[String]) extends AbstractTestTopology(args) with ScalaIntegrationTestBase { diff --git a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndClone.scala b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndClone.scala new file mode 100644 index 0000000..2742b99 --- /dev/null +++ b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndClone.scala @@ -0,0 +1,74 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.apache.heron.integration_test.topology.scala_streamlet_with_map_and_flatmap_and_filter_and_clone + +import scala.collection.mutable.Set + +import org.apache.heron.api.Config +import org.apache.heron.integration_test.common.{ + AbstractTestTopology, + ScalaIntegrationTestBase +} +import org.apache.heron.integration_test.core.TestTopologyBuilder +import org.apache.heron.streamlet.scala.Builder + +object ScalaStreamletWithMapAndFlatMapAndFilterAndClone { + val months = "january - february - march - april - may - june" + + " - july - august - september - october - november - december" + + val summerMonths = + List("june", "july", "august") + + val incomingMonths = Set[String]() + + def main(args: Array[String]): Unit = { + val conf = new Config + val topology = new ScalaStreamletWithMapAndFlatMapAndFilterAndClone(args) + topology.submit(conf) + } +} + +class ScalaStreamletWithMapAndFlatMapAndFilterAndClone(args: Array[String]) + extends AbstractTestTopology(args) + with ScalaIntegrationTestBase { + + import ScalaStreamletWithMapAndFlatMapAndFilterAndClone._ + + override protected def buildTopology( + testTopologyBuilder: TestTopologyBuilder): TestTopologyBuilder = { + val streamletBuilder = Builder.newBuilder + + val clonedStreamlet = streamletBuilder + .newSource(() => months) + .setName("months-text") + .flatMap[String]((m: String) => m.split(" - ")) + .setName("months") + .filter((month: String) => + (summerMonths.contains(month.toLowerCase) + && incomingMonths.add(month.toLowerCase))) + .setName("summer-months") + .map[String]((word: String) => word.substring(0, 3)) + .setName("summer-months-with-short-name") + .clone(numClones = 2) + + //Returns Summer Months with Uppercase + clonedStreamlet(0).map[String](month => month + "_2018") + + //Returns Summer Months with Uppercase + clonedStreamlet(1).map[String](_.toUpperCase) + + build(testTopologyBuilder, streamletBuilder) + } + +} diff --git a/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndCloneResults.json b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndCloneResults.json new file mode 100644 index 0000000..6ececf2 --- /dev/null +++ b/integration_test/src/scala/org/apache/heron/integration_test/topology/scala_streamlet_with_map_and_flatmap_and_filter_and_clone/ScalaStreamletWithMapAndFlatMapAndFilterAndCloneResults.json @@ -0,0 +1 @@ +["AUG", "JUL", "JUN", "aug_2018", "jul_2018", "jun_2018"] \ No newline at end of file -- To stop receiving notification emails like this one, please contact karth...@apache.org.