This is an automated email from the ASF dual-hosted git repository. karthikz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push: new ad29cc3 [Streamlet Scala API] Add Scala Streamlet Integration Tests Part I (#2826) ad29cc3 is described below commit ad29cc36d6f5e3e55a72dd20c07e0ffad0c02a88 Author: Eren Avsarogullari <erenavsarogull...@gmail.com> AuthorDate: Sun Apr 8 00:07:09 2018 +0100 [Streamlet Scala API] Add Scala Streamlet Integration Tests Part I (#2826) * Scala Streamlet Integration Tests Part I * Minor change is applied to TextTransformer * Fix Scala Streamlet Integration Test Packaging Problem * Fix Scala Streamlet Integration Test Result Problem * Add Fix for Travis * Add Fix for Travis --- .gitignore | 3 + .../twitter/heron/streamlet/impl/BuilderImpl.java | 6 +- .../streamlet/scala/SerializableTransformer.scala | 2 - .../heron/streamlet/scala/impl/BuilderImpl.scala | 3 + integration_test/src/python/test_runner/main.py | 8 ++- .../src/python/test_runner/resources/test.json | 7 +++ integration_test/src/scala/BUILD | 28 +++++++++ .../common/ScalaIntegrationTestBase.scala | 32 ++++++++++ .../ScalaStreamletWithFilterAndTransform.scala | 71 ++++++++++++++++++++++ ...calaStreamletWithFilterAndTransformResults.json | 1 + scripts/applatix/javatests.sh | 19 +++++- scripts/applatix/test.sh | 19 +++++- scripts/packages/BUILD | 11 ++++ scripts/run_integration_test.sh | 9 +++ scripts/travis/test.sh | 19 +++++- 15 files changed, 226 insertions(+), 12 deletions(-) diff --git a/.gitignore b/.gitignore index 927928b..12fbf25 100644 --- a/.gitignore +++ b/.gitignore @@ -132,3 +132,6 @@ website/public/ # Visual Studio Code .vscode + +# integration_test +results/ \ No newline at end of file diff --git a/heron/api/src/java/com/twitter/heron/streamlet/impl/BuilderImpl.java b/heron/api/src/java/com/twitter/heron/streamlet/impl/BuilderImpl.java index e00eb14..b982453 100644 --- a/heron/api/src/java/com/twitter/heron/streamlet/impl/BuilderImpl.java +++ b/heron/api/src/java/com/twitter/heron/streamlet/impl/BuilderImpl.java @@ -11,10 +11,8 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - package com.twitter.heron.streamlet.impl; - import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -60,6 +58,10 @@ public final class BuilderImpl implements Builder { */ public TopologyBuilder build() { TopologyBuilder builder = new TopologyBuilder(); + return build(builder); + } + + public TopologyBuilder build(TopologyBuilder builder) { Set<String> stageNames = new HashSet<>(); for (StreamletImpl<?> streamlet : sources) { streamlet.build(builder, stageNames); diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/scala/SerializableTransformer.scala b/heron/api/src/scala/com/twitter/heron/streamlet/scala/SerializableTransformer.scala index 7867448..7bbbdd2 100644 --- a/heron/api/src/scala/com/twitter/heron/streamlet/scala/SerializableTransformer.scala +++ b/heron/api/src/scala/com/twitter/heron/streamlet/scala/SerializableTransformer.scala @@ -13,8 +13,6 @@ // limitations under the License. package com.twitter.heron.streamlet.scala -import java.io.Serializable - import com.twitter.heron.streamlet.Context /** diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/scala/impl/BuilderImpl.scala b/heron/api/src/scala/com/twitter/heron/streamlet/scala/impl/BuilderImpl.scala index d6cd4c8..4458d70 100644 --- a/heron/api/src/scala/com/twitter/heron/streamlet/scala/impl/BuilderImpl.scala +++ b/heron/api/src/scala/com/twitter/heron/streamlet/scala/impl/BuilderImpl.scala @@ -37,4 +37,7 @@ class BuilderImpl(builder: com.twitter.heron.streamlet.Builder) def build(): TopologyBuilder = builder.asInstanceOf[JavaBuilderImpl].build() + def build(topologyBuilder: TopologyBuilder): TopologyBuilder = + builder.asInstanceOf[JavaBuilderImpl].build(topologyBuilder) + } diff --git a/integration_test/src/python/test_runner/main.py b/integration_test/src/python/test_runner/main.py index 861a951..f5d4d1c 100644 --- a/integration_test/src/python/test_runner/main.py +++ b/integration_test/src/python/test_runner/main.py @@ -293,11 +293,15 @@ def run_tests(conf, args): http_server_host_port = "%s:%d" % (args.http_server_hostname, args.http_server_port) - if args.tests_bin_path.endswith(".jar"): + if args.tests_bin_path.endswith("scala-integration-tests.jar"): + test_topologies = filter_test_topologies(conf["scalaTopologies"], args.test_topology_pattern) + topology_classpath_prefix = conf["topologyClasspathPrefix"] + extra_topology_args = "-s http://%s/state" % http_server_host_port + elif args.tests_bin_path.endswith("integration-tests.jar"): test_topologies = filter_test_topologies(conf["javaTopologies"], args.test_topology_pattern) topology_classpath_prefix = conf["topologyClasspathPrefix"] extra_topology_args = "-s http://%s/state" % http_server_host_port - elif args.tests_bin_path.endswith(".pex"): + elif args.tests_bin_path.endswith("heron_integ_topology.pex"): test_topologies = filter_test_topologies(conf["pythonTopologies"], args.test_topology_pattern) topology_classpath_prefix = "" extra_topology_args = "" diff --git a/integration_test/src/python/test_runner/resources/test.json b/integration_test/src/python/test_runner/resources/test.json index 049c58c..ba0addd 100644 --- a/integration_test/src/python/test_runner/resources/test.json +++ b/integration_test/src/python/test_runner/resources/test.json @@ -7,6 +7,13 @@ "cliConfigPath" : "$HOME/.heron/conf", "topologyClasspathPrefix" : "com.twitter.heron.integration_test.topology.", "releasePackageUri" : "scheme://role/name/version", + "scalaTopologies": [ + { + "topologyName" : "IntegrationTest_ScalaStreamletWithFilterAndTransform", + "classPath" : "scala_streamlet_with_filter_and_transform.ScalaStreamletWithFilterAndTransform", + "expectedResultRelativePath" : "scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransformResults.json" + } + ], "javaTopologies": [ { "topologyName" : "IntegrationTest_FieldsGrouping", diff --git a/integration_test/src/scala/BUILD b/integration_test/src/scala/BUILD new file mode 100644 index 0000000..48b7d81 --- /dev/null +++ b/integration_test/src/scala/BUILD @@ -0,0 +1,28 @@ +licenses(["notice"]) + +package(default_visibility = ["//visibility:public"]) + +filegroup( + name = "test-data-files", + srcs = glob(["**/*.json"]), +) + +scala_binary( + name = "scala-integration-tests-unshaded", + srcs = glob(["com/twitter/heron/integration_test/**/*.scala"]), + deps = [ + "//heron/api/src/java:api-java", + "//heron/api/src/scala:api-scala", + "//integration_test/src/java:common", + "//integration_test/src/java:core", + "//heron/api/src/java:api-java-low-level" + ], + main_class = "com.twitter.heron.integration_test.topology.scala_streamlet_with_filter_and_transform.ScalaStreamletWithFilterAndTransform" +) + +genrule( + name = 'scala-integration-tests', + srcs = [":scala-integration-tests-unshaded_deploy.jar"], + outs = ["scala-integration-tests.jar"], + cmd = "cp $< $@" +) \ No newline at end of file diff --git a/integration_test/src/scala/com/twitter/heron/integration_test/common/ScalaIntegrationTestBase.scala b/integration_test/src/scala/com/twitter/heron/integration_test/common/ScalaIntegrationTestBase.scala new file mode 100644 index 0000000..9563a1f --- /dev/null +++ b/integration_test/src/scala/com/twitter/heron/integration_test/common/ScalaIntegrationTestBase.scala @@ -0,0 +1,32 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.integration_test.common + +import com.twitter.heron.integration_test.core.TestTopologyBuilder +import com.twitter.heron.streamlet.scala.Builder +import com.twitter.heron.streamlet.scala.impl.BuilderImpl + +/** + * Scala Integration Test Base + */ +trait ScalaIntegrationTestBase extends Serializable { + + protected def build(testTopologyBuilder: TestTopologyBuilder, + streamletBuilder: Builder): TestTopologyBuilder = { + val streamletBuilderImpl = streamletBuilder.asInstanceOf[BuilderImpl] + val topologyBuilder = streamletBuilderImpl.build(testTopologyBuilder) + topologyBuilder.asInstanceOf[TestTopologyBuilder] + } + +} diff --git a/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala b/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala new file mode 100644 index 0000000..3408032 --- /dev/null +++ b/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala @@ -0,0 +1,71 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.integration_test.topology.scala_streamlet_with_filter_and_transform + +import java.util.concurrent.atomic.AtomicInteger + +import com.twitter.heron.api.Config +import com.twitter.heron.integration_test.common.{ + AbstractTestTopology, + ScalaIntegrationTestBase +} +import com.twitter.heron.integration_test.core.TestTopologyBuilder +import com.twitter.heron.streamlet.Context +import com.twitter.heron.streamlet.scala.{Builder, SerializableTransformer} + +object ScalaStreamletWithFilterAndTransform { + def main(args: Array[String]): Unit = { + val conf = new Config + val topology = new ScalaStreamletWithFilterAndTransform(args) + topology.submit(conf) + } +} + +/** + * Scala Streamlet Integration Test + */ +@SerialVersionUID(-7280407024398984674L) +class ScalaStreamletWithFilterAndTransform(args: Array[String]) + extends AbstractTestTopology(args) + with ScalaIntegrationTestBase { + + override protected def buildTopology( + testTopologyBuilder: TestTopologyBuilder): TestTopologyBuilder = { + val atomicInteger = new AtomicInteger + + val streamletBuilder = Builder.newBuilder + + streamletBuilder + .newSource(() => atomicInteger.getAndIncrement()) + .setName("incremented-numbers") + .filter((i: Int) => i <= 7) + .setName("positive-numbers-lower-than-8") + .transform[String](new TextTransformer()) + .setName("numbers-transformed-to-text") + + build(testTopologyBuilder, streamletBuilder) + } + +} + +private class TextTransformer extends SerializableTransformer[Int, String] { + private val alphabet = List("a", "b", "c", "d", "e", "f", "g", "h") + + override def setup(context: Context): Unit = {} + + override def transform(i: Int, fun: String => Unit): Unit = + fun(s"${alphabet(i)}-$i".toUpperCase) + + override def cleanup(): Unit = {} +} diff --git a/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransformResults.json b/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransformResults.json new file mode 100644 index 0000000..dda7256 --- /dev/null +++ b/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransformResults.json @@ -0,0 +1 @@ +["A-0", "B-1", "C-2", "D-3", "E-4", "F-5", "G-6", "H-7"] \ No newline at end of file diff --git a/scripts/applatix/javatests.sh b/scripts/applatix/javatests.sh index 0744aaa..e19d5df 100755 --- a/scripts/applatix/javatests.sh +++ b/scripts/applatix/javatests.sh @@ -9,14 +9,29 @@ source ${DIR}/testutils.sh # integration test binaries have to be specified as absolute path JAVA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/integration-tests.jar" +SCALA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/scala-integration-tests.jar" -# run the java integration test -T="heron integration_test java" +# initialize http-server for integration tests +T="heron integration_test http-server initialization" start_timer "$T" ${HOME}/bin/http-server 8080 & http_server_id=$! trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT +end_timer "$T" + +# run the scala integration test +T="heron integration_test scala" +start_timer "$T" +${HOME}/bin/test-runner \ + -hc heron -tb ${SCALA_INTEGRATION_TESTS_BIN} \ + -rh localhost -rp 8080\ + -tp ${HOME}/.herontests/data/scala \ + -cl local -rl heron-staging -ev devel +end_timer "$T" +# run the java integration test +T="heron integration_test java" +start_timer "$T" ${HOME}/bin/test-runner \ -hc heron -tb ${JAVA_INTEGRATION_TESTS_BIN} \ -rh localhost -rp 8080\ diff --git a/scripts/applatix/test.sh b/scripts/applatix/test.sh index c346dea..8f7391d 100755 --- a/scripts/applatix/test.sh +++ b/scripts/applatix/test.sh @@ -18,6 +18,7 @@ export PATH=${HOME}/bin:$PATH # integration test binaries have to be specified as absolute path JAVA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/integration-tests.jar" PYTHON_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/heron_integ_topology.pex" +SCALA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/scala-integration-tests.jar" # install client T="heron client install" @@ -37,13 +38,27 @@ start_timer "$T" python ${UTILS}/save-logs.py "heron_tests_install.txt" ./heron-tests-install.sh --user end_timer "$T" -# run the java integration test -T="heron integration_test java" +# initialize http-server for integration tests +T="heron integration_test http-server initialization" start_timer "$T" ${HOME}/bin/http-server 8080 & http_server_id=$! trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT +end_timer "$T" + +# run the scala integration test +T="heron integration_test scala" +start_timer "$T" +${HOME}/bin/test-runner \ + -hc heron -tb ${SCALA_INTEGRATION_TESTS_BIN} \ + -rh localhost -rp 8080\ + -tp ${HOME}/.herontests/data/scala \ + -cl local -rl heron-staging -ev devel +end_timer "$T" +# run the java integration test +T="heron integration_test java" +start_timer "$T" ${HOME}/bin/test-runner \ -hc heron -tb ${JAVA_INTEGRATION_TESTS_BIN} \ -rh localhost -rp 8080\ diff --git a/scripts/packages/BUILD b/scripts/packages/BUILD index facfb2b..eada6e2 100644 --- a/scripts/packages/BUILD +++ b/scripts/packages/BUILD @@ -344,6 +344,15 @@ pkg_tar( ) pkg_tar( + name = "heron-tests-data-scala", + package_dir = "data/scala", + srcs = [ + "//integration_test/src/scala:test-data-files", + ], + strip_prefix = '/integration_test/src/scala/com/twitter/heron/integration_test/topology/' +) + +pkg_tar( name = "heron-tests-data-java", package_dir = "data/java", srcs = [ @@ -365,6 +374,7 @@ pkg_tar( name = "heron-tests-lib", package_dir = "lib", srcs = [ + "//integration_test/src/scala:scala-integration-tests", "//integration_test/src/java:integration-tests", "//integration_test/src/python/integration_test/topology:heron_integ_topology", ], @@ -376,6 +386,7 @@ pkg_tar( srcs = generated_release_files, deps = [ ":heron-tests-bin", + ":heron-tests-data-scala", ":heron-tests-data-java", ":heron-tests-data-python", ":heron-tests-lib", diff --git a/scripts/run_integration_test.sh b/scripts/run_integration_test.sh index 903f879..d70881b 100755 --- a/scripts/run_integration_test.sh +++ b/scripts/run_integration_test.sh @@ -8,10 +8,12 @@ TEST_RUNNER="./bazel-bin/integration_test/src/python/test_runner/test-runner.pex JAVA_TESTS_DIR="integration_test/src/java/com/twitter/heron/integration_test/topology" PYTHON_TESTS_DIR="integration_test/src/python/integration_test/topology" +SCALA_TESTS_DIR="integration_test/src/scala/com/twitter/heron/integration_test/topology" # integration test binaries have to be specified as absolute path JAVA_INTEGRATION_TESTS_BIN="${PWD}/bazel-genfiles/integration_test/src/java/integration-tests.jar" PYTHON_INTEGRATION_TESTS_BIN="${PWD}/bazel-bin/integration_test/src/python/integration_test/topology/heron_integ_topology.pex" +SCALA_INTEGRATION_TESTS_BIN="${PWD}/bazel-genfiles/integration_test/src/scala/scala-integration-tests.jar" CORE_PKG="file://${PWD}/bazel-bin/scripts/packages/heron-core.tar.gz" @@ -28,6 +30,13 @@ ${HTTP_SERVER} 8080 & http_server_id=$! trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT +# run the scala integration tests +${TEST_RUNNER} \ + -hc ~/.heron/bin/heron -tb ${SCALA_INTEGRATION_TESTS_BIN} \ + -rh localhost -rp 8080 \ + -tp ${SCALA_TESTS_DIR} \ + -cl local -rl heron-staging -ev devel -pi ${CORE_PKG} + # run the java integration tests ${TEST_RUNNER} \ -hc ~/.heron/bin/heron -tb ${JAVA_INTEGRATION_TESTS_BIN} \ diff --git a/scripts/travis/test.sh b/scripts/travis/test.sh index 420a56f..2cece60 100755 --- a/scripts/travis/test.sh +++ b/scripts/travis/test.sh @@ -15,6 +15,7 @@ echo "Using $PLATFORM platform" # integration test binaries have to be specified as absolute path JAVA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/integration-tests.jar" PYTHON_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/heron_integ_topology.pex" +SCALA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/scala-integration-tests.jar" # build test related jar T="heron build integration_test" @@ -40,13 +41,27 @@ start_timer "$T" python ./bazel-bin/integration_test/src/python/local_test_runner/local-test-runner end_timer "$T" -# run the java integration test -T="heron integration_test java" +# initialize http-server for integration tests +T="heron integration_test http-server initialization" start_timer "$T" ${HOME}/bin/http-server 8080 & http_server_id=$! trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT +end_timer "$T" + +# run the scala integration test +T="heron integration_test scala" +start_timer "$T" +${HOME}/bin/test-runner \ + -hc heron -tb ${SCALA_INTEGRATION_TESTS_BIN} \ + -rh localhost -rp 8080\ + -tp ${HOME}/.herontests/data/scala \ + -cl local -rl heron-staging -ev devel +end_timer "$T" +# run the java integration test +T="heron integration_test java" +start_timer "$T" ${HOME}/bin/test-runner \ -hc heron -tb ${JAVA_INTEGRATION_TESTS_BIN} \ -rh localhost -rp 8080\ -- To stop receiving notification emails like this one, please contact karth...@apache.org.