http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestSimpleAggregateExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestSimpleAggregateExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestSimpleAggregateExecutor.java deleted file mode 100644 index 0d0638d..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestSimpleAggregateExecutor.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import junit.framework.Assert; -import org.apache.eagle.dataproc.impl.aggregate.SimpleAggregateExecutor; -import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity; -import org.junit.Test; -import scala.Tuple2; - -import java.util.ArrayList; -import java.util.List; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Created on 1/20/16. - */ -public class TestSimpleAggregateExecutor { - - @Test - public void test() throws Exception { - SimpleAggregateExecutor sae = new SimpleAggregateExecutor(new String[]{"s1"}, - "define stream s1(eagleAlertContext object, timestamp long, metric string);" + - " @info(name='query')" + - " from s1 select * insert into tmp;" - , - "siddhiCEPEngine", - 0, - 1); - - Config config = ConfigFactory.empty(); - sae.prepareConfig(config); - sae.init(); - - List<Object> tuple = new ArrayList<>(3); - tuple.add(0, "groupbykey"); - tuple.add(1, "s1"); - SortedMap value = new TreeMap(); - value.put("timestamp", System.currentTimeMillis()); - value.put("metric", "name-of-the-metric"); - tuple.add(2, value); - - final AtomicInteger count = new AtomicInteger(); - sae.flatMap(tuple, new Collector<Tuple2<String, AggregateEntity>>(){ - @Override - public void collect(Tuple2<String, AggregateEntity> stringAggregateEntityTuple2) { - System.out.print(stringAggregateEntityTuple2._1()); - count.incrementAndGet(); - } - }); - - Thread.sleep(3000); - Assert.assertEquals(1, count.get()); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestStreamAggregate.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestStreamAggregate.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestStreamAggregate.java deleted file mode 100644 index ed5d705..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestStreamAggregate.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream; - -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichSpout; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import junit.framework.Assert; -import org.apache.eagle.datastream.core.*; -import org.apache.eagle.datastream.storm.StormExecutionEnvironment; -import org.apache.eagle.partition.PartitionStrategy; -import org.apache.eagle.alert.entity.AlertAPIEntity; -import org.jgrapht.experimental.dag.DirectedAcyclicGraph; -import org.junit.Before; -import org.junit.Test; -import scala.collection.Seq; - -import java.util.Arrays; -import java.util.List; -import java.util.Map; - - -/** - * @since Dec 18, 2015 - * - */ -public class TestStreamAggregate { - - private Config config; - - @SuppressWarnings("serial") - private final class SimpleSpout extends BaseRichSpout { - @SuppressWarnings("rawtypes") - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - } - @Override - public void nextTuple() { - } - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - } - } - - public static class TestEnvironment extends StormExecutionEnvironment { - private static final long serialVersionUID = 1L; - public TestEnvironment(Config conf) { - super(conf); - } - @Override - public void execute(StreamDAG dag) { - System.out.println("DAT completed!"); - } - } - - public static class DummyStrategy implements PartitionStrategy { - private static final long serialVersionUID = 1L; - @Override - public int balance(String key, int buckNum) { - return 0; - } - }; - - public static class DummyExecutor extends JavaStormStreamExecutor2<String, AlertAPIEntity> { - @Override - public void prepareConfig(Config config) { - } - @Override - public void init() { - } - @Override - public void flatMap(List input, Collector collector) { - } - } - - @Before - public void setUp() { - System.setProperty("config.resource", "/application.conf"); - ConfigFactory.invalidateCaches(); - config = ConfigFactory.load(); - } - - @SuppressWarnings({ "unchecked", "rawtypes", "serial" }) - @Test - public void testAggregate1() { - StormExecutionEnvironment exe = new TestEnvironment(config); - - BaseRichSpout spout = new SimpleSpout(); - StormSourceProducer ssp = exe.fromSpout(spout); - - ssp.flatMap(new FlatMapper<String>() { - @Override - public void flatMap(Seq<Object> input, Collector<String> collector) { - // do nothing - } - }).aggregate(Arrays.asList("c3EsLogEventStream"), "qid", new DummyStrategy()); - - try { - exe.execute(); - Assert.fail("customzied flat mapper(non java storm executor) before analyze is not supported!"); - } catch (Exception e ){ - } - } - - @SuppressWarnings({ "unchecked", "rawtypes", "serial" }) - @Test - public void testAggregate() { - StormExecutionEnvironment exe = new TestEnvironment(config); - StormSourceProducer ssp = exe.fromSpout(new SimpleSpout()); - DummyExecutor dummy = new DummyExecutor(); - ssp.flatMap(dummy).aggregate(Arrays.asList("c3EsLogEventStream"), "analyzeStreamExecutor", new DummyStrategy()); - - try { - exe.execute(); - } catch (Exception e) { - Assert.fail("customized flat mapper before"); - } - // Assertion - DirectedAcyclicGraph<StreamProducer<Object>, StreamConnector<Object, Object>> dag = exe.dag(); - Assert.assertEquals("three vertex", 3, dag.vertexSet().size()); - boolean hasWrapped = false; - for (StreamProducer<Object> obj : dag.vertexSet()) { - if (obj instanceof FlatMapProducer) { - if (((FlatMapProducer) obj).mapper() instanceof JavaStormExecutorForAlertWrapper) { - hasWrapped = true; - Assert.assertEquals("dummy executor should be wrapped in the alert wrapper func", dummy, - ((JavaStormExecutorForAlertWrapper) ((FlatMapProducer) obj).mapper() ).getDelegate()); - - } - } - } - Assert.assertTrue(hasWrapped); - - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf deleted file mode 100644 index 87b1947..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf +++ /dev/null @@ -1,79 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -{ - "envContextConfig" : { - "env" : "storm", - "mode" : "local", - "topologyName" : "kafka-monitor-topology", - "parallelismConfig" : { - "kafkaMsgConsumer" : 1 - } - }, - "dataSourceConfig": { - "topic" : "nn_jmx_metric_sandbox", - "zkConnection" : "localhost:2181", - "zkConnectionTimeoutMS" : 15000, - "consumerGroupId" : "EagleConsumer", - "fetchSize" : 1048586, - "transactionZKServers" : "localhost", - "transactionZKPort" : 2181, - "transactionZKRoot" : "/consumers", - "transactionStateUpdateMS" : 2000 - }, - "alertExecutorConfigs" : { - "eventStreamExecutor" : { - "parallelism" : 1, - "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner" - "needValidation" : "true" - } - }, - "persistExecutorConfigs" { - "persistExecutor1" : { - "kafka": { - "bootstrap_servers" : "localhost", - "topics" : { - "defaultOutput" : "downSampleOutput" - } - } - } - }, - "aggregateExecutorConfigs" : { - "aggregateStreamExecutor" : { - "parallelism" : 1, - "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner" - "needValidation" : "true" - } - }, - "eagleProps" : { - "site" : "sandbox", - "application": "eventSource", - "dataJoinPollIntervalSec" : 30, - "mailHost" : "mail.host.com", - "mailSmtpPort":"25", - "mailDebug" : "true", - "eagleService": { - "host": "localhost", - "port": 38080, - "username": "admin", - "password": "secret" - } - }, - "dynamicConfigSource" : { - "enabled" : true, - "initDelayMillis" : 0, - "delayMillis" : 30000 - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/event-metadata-init.sh ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/event-metadata-init.sh b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/event-metadata-init.sh deleted file mode 100644 index 3e57b6c..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/event-metadata-init.sh +++ /dev/null @@ -1,41 +0,0 @@ -#!/bin/bash - -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -export EAGLE_SERVICE_USER="admin" -export EAGLE_SERVICE_PASSWD="secret" -export EAGLE_SERVICE_HOST="localhost" -export EAGLE_SERVICE_PORT=38080 - -# AlertDataSource: data sources bound to sites -echo "Importing AlertDataSourceService for stream... " - -curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDataSourceService" -d '[{"prefix":"alertDataSource","tags":{"site" : "sandbox", "dataSource":"eventSource"}, "enabled": "true", "config" : ""}]' - -## AlertStreamService: alert streams generated from data source -echo "" -echo "Importing AlertStreamService for stream... " -curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" -d '[{"prefix":"alertStream","tags":{"dataSource":"eventSource","streamName":"eventStream"},"desc":"alert event stream from hdfs audit log"}]' - -## AlertExecutorService: what alert streams are consumed by alert executor -echo "" -echo "Importing AlertExecutorService for stream ... " -curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"eventSource","alertExecutorId":"eventStreamExecutor","streamName":"eventStream"},"desc":"alert executor for event stream"}]' - -## AlertStreamSchemaService: schema for event from alert stream -echo "" -echo "Importing AlertStreamSchemaService for stream ... " -curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"eventSource","streamName":"eventStream","attrName":"timestamp"},"attrDescription":"event timestamp","attrType":"long","category":"","attrValueResolver":""},{"prefix":"alertStreamSchema","tags":{"dataSource":"eventSource","streamName":"eventStream","attrName":"name"},"attrDescription":"event name","attrType":"string","category":"","attrValueResolver":""},{"prefix":"alertStreamSchema","tags":{"dataSource":"eventSource","streamName":"eventStream","attrName":"value"},"attrDescription":"event value","attrType":"integer","category":"","attrValueResolver":""}]' \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/log4j.properties b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/log4j.properties deleted file mode 100644 index 3499c46..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/log4j.properties +++ /dev/null @@ -1,35 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -log4j.rootLogger=INFO, stdout - - eagle.log.dir=./logs - eagle.log.file=eagle.log - -# standard output -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n - -# Daily Rolling File Appender - log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender - log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file} - log4j.appender.DRFA.DatePattern=.yyyy-MM-dd -## 30-day backup -# log4j.appender.DRFA.MaxBackupIndex=30 - log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout - -# Pattern format: Date LogLevel LoggerName LogMessage -log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/StormWrapperUtilsSpec.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/StormWrapperUtilsSpec.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/StormWrapperUtilsSpec.scala deleted file mode 100644 index 9d378c9..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/StormWrapperUtilsSpec.scala +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream - -import org.apache.eagle.datastream.storm.StormWrapperUtils -import org.scalatest.{FlatSpec, Matchers} - -class StormWrapperUtilsSpec extends FlatSpec with Matchers{ - import StormWrapperUtils._ - "StormWrapperUtils" should "convert Tuple{1,2,3,..} to java.util.List" in { - val list1 = productAsJavaList(new Tuple1("a")) - list1.size() should be(1) - list1.get(0) should be("a") - - val list2 = productAsJavaList(new Tuple2("a","b")) - list2.size() should be(2) - list2.get(0) should be("a") - list2.get(1) should be("b") - - val list3 = productAsJavaList(new Tuple3("a","b","c")) - list3.size() should be(3) - list3.get(0) should be("a") - list3.get(1) should be("b") - list3.get(2) should be("c") - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala deleted file mode 100644 index 4339e5a..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala +++ /dev/null @@ -1,104 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.datastream - -import com.typesafe.config.{Config, ConfigFactory} -import org.apache.eagle.datastream.storm.StormExecutionEnvironment - -object testStreamUnionExpansion extends App{ - val config : Config = ConfigFactory.load - val env = new StormExecutionEnvironment(config) - val tail1 = env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a)) - val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key1",a)) - tail1.streamUnion(List(tail2)).map1(a => "xyz") -} - -object testStreamGroupbyExpansion extends App{ - val config : Config = ConfigFactory.load; - val env = new StormExecutionEnvironment(config) - env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).groupBy(1).map2(a => ("key1",a)) - //env.execute -} - -object testStreamUnionAndGroupbyExpansion extends App{ - val config : Config = ConfigFactory.load; - val env = new StormExecutionEnvironment(config) - val tail1 = env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a)).groupBy(1) - val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key1",a)).groupBy(0) - tail1.streamUnion(List(tail2)).map1(a => "xyz") -// env.execute() -} - -/** - * 1. stream schema - * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s1","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]' - * 2. policy - * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]' - */ -object testAlertExpansion extends App{ - val config : Config = ConfigFactory.load; - val env = new StormExecutionEnvironment(config) - val tail1 = env.fromSpout(TestSpout()).nameAs("testSpout1") - .flatMap(WordPrependForAlertExecutor("test")).nameAs("prepend") - .alertWithConsumer("s1", "alert1") - //env.execute -} - -/** - * 1. stream schema - * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s1","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]' - * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s2","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]' - * 2. policy - * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*\)==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]' - */ -object testAlertExpansionWithUnion extends App{ - val config : Config = ConfigFactory.load; - val env = new StormExecutionEnvironment(config) - val tail1 = env.fromSpout(TestSpout()).nameAs("testSpout1").flatMap(WordPrependForAlertExecutor("test")).nameAs("prepend") //.map2(a => ("key1",a)) - val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")) //.map2(a => ("key1",a)) - tail1.streamUnion(List(tail2)).alert(Seq("s1","s2"), "alert1", consume = true) - //env.execute -} - - -object testStreamUnionExpansionWithSharedSpout extends App{ - val config : Config = ConfigFactory.load; - val env = new StormExecutionEnvironment(config) - val source = env.fromSpout(TestSpout()) - val tail1 = source.flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a)) - val tail2 = source.flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key1",a)) - tail1.streamUnion(List(tail2)).map1(a => { - println(a) - "xyz" - }) -// env.execute -} - -object testStreamUnionExpansionWithSharedSpout_2 extends App{ - val config : Config = ConfigFactory.load; - val env = new StormExecutionEnvironment(config) - val source = env.fromSpout(TestSpout()) - val tail1 = source.flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a)) - source.streamUnion(List(tail1)).map1(a => { - println(a) - "xyz" - }) -// env.execute -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala deleted file mode 100644 index ccd3deb..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream - -import com.typesafe.config.ConfigFactory -import org.apache.eagle.datastream.storm.StormExecutionEnvironment - -/** - * @since 12/5/15 - */ -object TestExecutionEnvironment extends App{ - val env0 = ExecutionEnvironments.get[StormExecutionEnvironment] - println(env0) - val config = ConfigFactory.load() - val env1 = ExecutionEnvironments.getWithConfig[StormExecutionEnvironment](config) - println(env1) - val env2 = ExecutionEnvironments.get[StormExecutionEnvironment](Array[String]("-D","key=value")) - println(env2) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestScala.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestScala.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestScala.scala deleted file mode 100644 index d785689..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestScala.scala +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream - -trait A { val x: Int } -case class B(val x: Int, y: Int) extends A \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormNodes.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormNodes.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormNodes.scala deleted file mode 100644 index 231ebab..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormNodes.scala +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream - -import java.util - -import backtype.storm.spout.SpoutOutputCollector -import backtype.storm.task.TopologyContext -import backtype.storm.topology.OutputFieldsDeclarer -import backtype.storm.topology.base.BaseRichSpout -import backtype.storm.tuple.Fields -import com.typesafe.config.Config -import org.slf4j.LoggerFactory - -case class TestSpout() extends BaseRichSpout { - val LOG = LoggerFactory.getLogger(TestSpout.getClass) - var _collector : SpoutOutputCollector = null - override def nextTuple : Unit = { - _collector.emit(util.Arrays.asList("abc")) - LOG.info("send spout data abc") - Thread.sleep(1000) - } - override def declareOutputFields (declarer: OutputFieldsDeclarer): Unit ={ - declarer.declare(new Fields("value")) - } - override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit ={ - _collector = collector - } -} - -case class TestKeyValueSpout() extends BaseRichSpout { - val LOG = LoggerFactory.getLogger(TestSpout.getClass) - var _collector : SpoutOutputCollector = null - var count : Int = 0 - override def nextTuple : Unit = { - if(count%3 == 0) { - _collector.emit(util.Arrays.asList("abc", new Integer(1))) - }else{ - _collector.emit(util.Arrays.asList("xyz", new Integer(1))) - } - count += 1; - LOG.info("send spout data abc/xyz") - Thread.sleep(1000) - } - override def declareOutputFields (declarer: OutputFieldsDeclarer): Unit ={ - declarer.declare(new Fields("word", "count")) - } - override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit ={ - _collector = collector - } -} - -case class EchoExecutor() extends StormStreamExecutor1[String] { - val LOG = LoggerFactory.getLogger(EchoExecutor.getClass) - var config : Config = null - override def prepareConfig(config : Config){this.config = config} - override def init {} - override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[String]]): Unit ={ - outputCollector.collect(Tuple1(input.head.asInstanceOf[String])) - LOG.info("echo " + input.head) - } -} - -case class WordPrependExecutor(prefix : String) extends StormStreamExecutor1[String] { - val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass) - var config : Config = null - override def prepareConfig(config : Config){this.config = config} - override def init {} - override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[String]]): Unit ={ - outputCollector.collect(Tuple1(prefix + "_" + input.head)) - LOG.info("preappend " + prefix + "_" + input.head) - } -} - -case class WordPrependForAlertExecutor(prefix : String) extends StormStreamExecutor2[String, util.SortedMap[Object, Object]] { - val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass) - var config : Config = null - override def prepareConfig(config : Config){this.config = config} - override def init {} - override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple2[String, util.SortedMap[Object, Object]]]): Unit ={ - val value = new util.TreeMap[Object, Object]() - value.put("word", prefix + "_" + input.head) - outputCollector.collect(Tuple2("key1",value)) - LOG.info("preappend " + prefix + "_" + input.head) - } -} - -case class WordPrependForAlertExecutor2(prefix : String) extends StormStreamExecutor1[util.SortedMap[Object, Object]] { - val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass) - var config : Config = null - override def prepareConfig(config : Config){this.config = config} - override def init {} - override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[util.SortedMap[Object, Object]]]): Unit ={ - val value = new util.TreeMap[Object, Object]() - value.put("word", prefix + "_" + input.head) - outputCollector.collect(Tuple1(value)) - LOG.info("preappend " + prefix + "_" + input.head) - } -} - -case class WordAppendExecutor(suffix : String) extends StormStreamExecutor1[String] { - val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass) - var config : Config = null - override def prepareConfig(config : Config){this.config = config} - override def init {} - override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[String]]): Unit ={ - outputCollector.collect(Tuple1(input.head + "_" + suffix)) - LOG.info("append " + input.head + "_" + suffix) - } -} - -case class WordAppendForAlertExecutor(suffix : String) extends StormStreamExecutor2[String, util.SortedMap[Object, Object]] { - val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass) - var config : Config = null - override def prepareConfig(config : Config){this.config = config} - override def init {} - override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple2[String, util.SortedMap[Object, Object]]]): Unit ={ - val value = new util.TreeMap[Object, Object]() - value.put("word", input.head + "_" + suffix) - outputCollector.collect(Tuple2("key1", value)) - LOG.info("append " + input.head + "_" + suffix) - } -} - -case class PatternAlertExecutor(pattern : String) extends StormStreamExecutor1[String] { - val LOG = LoggerFactory.getLogger(PatternAlertExecutor.getClass) - var config : Config = null - override def prepareConfig(config : Config){this.config = config} - override def init {} - override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[String]]): Unit ={ - LOG.info("send out " + input.head) - if(input.head.asInstanceOf[String].matches(pattern)){ - LOG.info("Alert hadppens for input " + input.head + " and for pattern " + pattern) - } - } -} - -case class GroupedEchoExecutor() extends StormStreamExecutor1[String] { - val LOG = LoggerFactory.getLogger(GroupedEchoExecutor.getClass) - var config : Config = null - override def prepareConfig(config : Config){this.config = config} - override def init {} - override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[String]]): Unit ={ - LOG.info("get " + input(0)) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala deleted file mode 100644 index c071eb2..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream - -import com.typesafe.config.{Config, ConfigFactory} - -/** - * explicit union - * a.union(b,c).alert() means (a,b,c)'s output is united into alert() - * before running this testing, we should define in eagle service one policy and one stream schema - * 1. stream schema - * curl -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s1","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]' - * 2. policy - * curl -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*\)==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]' - */ -object UnionForAlert extends App{ - val config : Config = ConfigFactory.load; - val env = ExecutionEnvironments.getStorm(config) - val tail1 = env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a)) - val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key2",a)) - tail1.streamUnion(List(tail2)).alert(Seq("s1","s2"), "alert1", consume = false) - env.execute() -} - -/** - * test alert after flatMap - */ -object TestAlertAfterFlatMap extends App{ - val config : Config = ConfigFactory.load; - val env = ExecutionEnvironments.getStorm(config) - val tail1 = env.fromSpout(TestSpout()) - .flatMap(WordPrependForAlertExecutor("test")) - .alert(Seq("s1"), "alert1", consume = false) - //env.execute -} - -/** - * test alert after Map - */ -object TestAlertAfterMap extends App{ - val config : Config = ConfigFactory.load; - val env = ExecutionEnvironments.getStorm(config) - val tail1 = env.fromSpout(TestSpout()) - .flatMap(WordPrependForAlertExecutor2("test")) - .map2(a => ("key", a)) - .alert(Seq("s1"), "alert1", false) - //env.execute -} - -object StormRunnerWithoutSplitOrJoin extends Application{ - val config : Config = ConfigFactory.load; - val env = ExecutionEnvironments.getStorm(config) - env.fromSpout(TestSpout()).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test")) - .flatMap(PatternAlertExecutor("test.*")) -// env.execute() -} - -object StormRunnerWithSplit extends Application{ - val config : Config = ConfigFactory.load; - val env = ExecutionEnvironments.getStorm(config) - val toBeSplit = env.fromSpout(TestSpout()).flatMap(EchoExecutor()) - toBeSplit.flatMap(WordPrependExecutor("test")).flatMap(PatternAlertExecutor("test.*")) - toBeSplit.flatMap(WordAppendExecutor("test")) -// env.execute() -} - -object StormRunnerWithUnion extends Application{ - val config : Config = ConfigFactory.load; - val env = ExecutionEnvironments.getStorm(config) - val tail1 = env.fromSpout(TestSpout()).flatMap(WordPrependExecutor("test")) - val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendExecutor("test")) - tail1.streamUnion(List(tail2)).flatMap(PatternAlertExecutor(".*test.*")) - env.execute() -} - -object StormRunnerWithFilter extends Application{ - val config : Config = ConfigFactory.load; - val env = ExecutionEnvironments.getStorm(config) - env.fromSpout(TestSpout()).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test")). - filter(_=>false). - flatMap(PatternAlertExecutor("test.*")) - //env.execute -} - -object StormRunnerWithJavaExecutor extends Application{ - val config : Config = ConfigFactory.load; - val env = ExecutionEnvironments.getStorm(config) - env.fromSpout(TestSpout()).flatMap(new JavaEchoExecutor()).flatMap(WordPrependExecutor("test")). - filter(_=>false). - flatMap(PatternAlertExecutor("test.*")) - //env.execute -} - -object StormRunnerWithKeyValueSpout extends Application{ - val config : Config = ConfigFactory.load; - val env = ExecutionEnvironments.getStorm(config) - env.fromSpout(TestKeyValueSpout()).groupBy(1).flatMap(new GroupedEchoExecutor()).parallelism(2) - //env.execute -} - -object StormRunnerWithKeyValueSpoutRenameOutputFields extends Application{ - val config : Config = ConfigFactory.load; - val env = ExecutionEnvironments.getStorm(config) - env.fromSpout(TestKeyValueSpout()).withOutputFields(2).groupBy(0).flatMap(new GroupedEchoExecutor()).parallelism(2) - //env.execute -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStreamDAGBuilder.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStreamDAGBuilder.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStreamDAGBuilder.scala deleted file mode 100644 index 48dc7e5..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStreamDAGBuilder.scala +++ /dev/null @@ -1,236 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream - -import com.typesafe.config.ConfigFactory -import org.scalatest.{FlatSpec, Matchers} - -class TestStreamDAGBuilder extends FlatSpec with Matchers{ -// "a single source DAG with groupBy" should "be traversed without groupBy node" in { -// val config = ConfigFactory.load() -// val env = ExecutionEnvironmentFactory.getStorm(config) -// val tail = env.newSource(null).flatMap(EchoExecutor()).groupBy(0).flatMap(WordPrependExecutor("test")) -// val dag = new StreamDAGBuilder(env.heads).build() -// val iter = dag.iterator() -// assert(iter.hasNext) -// iter.next() match{ -// case StormSourceProducer(t) => assert(t == null) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match{ -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[EchoExecutor]) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match{ -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor]) -// case _ => assert(false) -// } -// } -// -// "a single source DAG with groupBy from spout" should "be traversed without groupBy node" in { -// val config = ConfigFactory.load() -// val env = ExecutionEnvironmentFactory.getStorm(config) -// val tail = env.newSource(null).groupBy(0).flatMap(WordPrependExecutor("test")) -// val dag = new StreamDAGBuilder(env.heads).build() -// val iter = dag.iterator() -// assert(iter.hasNext) -// iter.next() match{ -// case StormSourceProducer(t) => assert(t == null) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match{ -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor]) -// case _ => assert(false) -// } -// } -// -// "a single source DAG with groupBy from spout and then split" should "be traversed without groupBy node" in { -// val config = ConfigFactory.load() -// val env = ExecutionEnvironmentFactory.getStorm(config) -// val groupby = env.newSource(null).groupBy(0) -// groupby.flatMap(WordPrependExecutor("test")) -// groupby.flatMap(WordAppendExecutor("test")) -// val dag = new StreamDAGBuilder(env.heads).build() -// val iter = dag.iterator() -// assert(iter.hasNext) -// iter.next() match{ -// case StormSourceProducer(t) => assert(t == null) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match{ -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor]) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match{ -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordAppendExecutor]) -// case _ => assert(false) -// } -// } -// -// "a single source DAG without stream join" should "be traversed sequentially like specified" in{ -// val config = ConfigFactory.load() -// val env = ExecutionEnvironmentFactory.getStorm(config) -// val tail = env.newSource(null).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test")) -// val dag = new StreamDAGBuilder(env.heads).build() -// val iter = dag.iterator() -// assert(iter.hasNext) -// iter.next() match{ -// case StormSourceProducer(t) => assert(t == null) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match{ -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[EchoExecutor]) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match{ -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor]) -// case _ => assert(false) -// } -// } -// -// "a single source with split" should "has more than one tail producer" in { -// val config = ConfigFactory.load() -// val env = ExecutionEnvironmentFactory.getStorm(config) -// val echo = env.newSource(null).flatMap(EchoExecutor()) -// val tail1 = echo.flatMap(WordPrependExecutor("test")) -// val tail2 = echo.flatMap(WordAppendExecutor("test")) -// val dag = new StreamDAGBuilder(env.heads).build() -// val iter = dag.iterator() -// assert(iter.hasNext) -// iter.next() match { -// case StormSourceProducer(t) => assert(t == null) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[EchoExecutor]) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor]) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordAppendExecutor]) -// case _ => assert(false) -// } -// } -// -// "a single source with split and join" should "has join" in { -// val config = ConfigFactory.load() -// val env = ExecutionEnvironmentFactory.getStorm(config) -// val echo = env.newSource(null).flatMap(EchoExecutor()) -// val tail1 = echo.flatMap(WordPrependExecutor("test")) -// val tail2 = echo.flatMap(WordAppendExecutor("test")).filter(_=>true).streamUnion(List(tail1)). -// flatMap(PatternAlertExecutor("test*")) -// val dag = new StreamDAGBuilder(env.heads).build() -// val iter = dag.iterator() -// assert(iter.hasNext) -// iter.next() match { -// case StormSourceProducer(t) => assert(t == null) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[EchoExecutor]) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor]) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordAppendExecutor]) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case FilterProducer(fn) => -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case FlatMapProducer(worker) => assert(worker.asInstanceOf[PatternAlertExecutor].pattern.equals("test*")) -// case _ => assert(false) -// } -// assert(!iter.hasNext) -// } -// -// "multiple sources with split and union" should "has union" in { -// val config = ConfigFactory.load() -// val env = ExecutionEnvironmentFactory.getStorm(config) -// val source1 = env.newSource(TestSpout()) -// val source2 = env.newSource(TestSpout()) -// val source3 = env.newSource(TestSpout()) -// -// val tail1 = source1.flatMap(WordPrependExecutor("test")) -// val tail2 = source2.filter(_=>true) -// val tail3 = source3.flatMap(WordAppendExecutor("test")).streamUnion(List(tail1, tail2)). -// flatMap(PatternAlertExecutor("abc*")) -// -// val dag = new StreamDAGBuilder(env.heads).build() -// val iter = dag.iterator() -// -// assert(iter.hasNext) -// iter.next() match { -// case StormSourceProducer(t) => assert(t.isInstanceOf[TestSpout]) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor]) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case StormSourceProducer(t) => assert(t.isInstanceOf[String]) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case FilterProducer(fn) => -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case SourceProducer(t) => assert(t.isInstanceOf[String]) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordAppendExecutor]) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case FlatMapProducer(worker) => assert(worker.asInstanceOf[PatternAlertExecutor].pattern.equals("abc*")) -// case _ => assert(false) -// } -// assert(!iter.hasNext) -// } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala deleted file mode 100644 index 7754765..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.datastream - -import org.apache.eagle.datastream.core.StreamContext -import org.apache.eagle.datastream.storm.StormExecutionEnvironment - -/** - * @since 12/4/15 - */ -case class Entity(name:String,value:Double,var inc:Int=0) - -object TestIterableWithGroupBy extends App { - - val env = ExecutionEnvironments.get[StormExecutionEnvironment](args) - - val tuples = Seq( - Entity("a", 1), - Entity("a", 2), - Entity("a", 3), - Entity("b", 2), - Entity("c", 3), - Entity("d", 3) - ) - - env.from(tuples) - .groupByKey(_.name) - .map(o => {o.inc += 2;o}) - .filter(_.name != "b") - .filter(_.name != "c") - .groupByKey(o=>(o.name,o.value)) - .map(o => (o.name,o)) - .map(o => (o._1,o._2.value,o._2.inc)) - .foreach(println) - - env.execute() -} - -object TestIterableWithGroupByWithStreamContext extends App { - val stream = StreamContext(args) - - val tuples = Seq( - Entity("a", 1), - Entity("a", 2), - Entity("a", 3), - Entity("b", 2), - Entity("c", 3), - Entity("d", 3) - ) - - stream.from(tuples) - .groupByKey(_.name) - .map(o => {o.inc += 2;o}) - .filter(_.name != "b") - .filter(_.name != "c") - .groupByKey(o=>(o.name,o.value)) - .map(o => (o.name,o)) - .map(o => (o._1,o._2.value,o._2.inc)) - .foreach(println) - - stream.submit[StormExecutionEnvironment] -} - -object TestIterableWithGroupByCircularly extends App{ - val env = ExecutionEnvironments.get[StormExecutionEnvironment](args) - - val tuples = Seq( - Entity("a", 1), - Entity("a", 2), - Entity("a", 3), - Entity("b", 2), - Entity("c", 3), - Entity("d", 3) - ) - - env.from(tuples,recycle = true) - .map(o => {o.inc += 2;o}) - .groupByKey(_.name) - .foreach(println) - env.execute() -} - -object TestGroupByKeyOnSpoutproxy extends App{ - val env = ExecutionEnvironments.get[StormExecutionEnvironment](args) - - val tuples = Seq( - Entity("a", 1), - Entity("a", 2), - Entity("a", 3), - Entity("b", 2), - Entity("c", 3), - Entity("d", 3) - ) - - env.fromSpout[String](TestSpout()) - .groupByKey(_.charAt(0)) - .foreach(println) - env.execute() -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-base/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/pom.xml b/eagle-core/eagle-data-process/eagle-stream-process-base/pom.xml deleted file mode 100644 index 740de03..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-base/pom.xml +++ /dev/null @@ -1,130 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <groupId>eagle</groupId> - <artifactId>eagle-data-process-parent</artifactId> - <version>0.3.0</version> - <relativePath>../pom.xml</relativePath> - </parent> - - <modelVersion>4.0.0</modelVersion> - <artifactId>eagle-stream-process-base</artifactId> - - <dependencies> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-math3</artifactId> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - </dependency> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_${scala.version}</artifactId> - </dependency> - <dependency> - <groupId>com.typesafe</groupId> - <artifactId>config</artifactId> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-annotations</artifactId> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - </dependency> - <dependency> - <groupId>eagle</groupId> - <artifactId>eagle-common</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.scalatest</groupId> - <artifactId>scalatest_${scala.version}</artifactId> - <scope>test</scope> - </dependency> - <!--<dependency>--> - <!--<groupId>org.apache.storm</groupId>--> - <!--<artifactId>storm-core</artifactId>--> - <!--<exclusions>--> - <!--<exclusion>--> - <!--<groupId>ch.qos.logback</groupId>--> - <!--<artifactId>logback-classic</artifactId>--> - <!--</exclusion>--> - <!--<exclusion>--> - <!--<groupId>log4j</groupId>--> - <!--<artifactId>log4j</artifactId>--> - <!--</exclusion>--> - <!--<exclusion>--> - <!--<groupId>org.slf4j</groupId>--> - <!--<artifactId>log4j-over-slf4j</artifactId>--> - <!--</exclusion>--> - <!--</exclusions>--> - <!--</dependency>--> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.scala-tools</groupId> - <artifactId>maven-scala-plugin</artifactId> - <executions> - <execution> - <id>scala-compile-first</id> - <phase>process-resources</phase> - <goals> - <goal>add-source</goal> - <goal>compile</goal> - </goals> - </execution> - <execution> - <id>scala-test-compile</id> - <phase>process-test-resources</phase> - <goals> - <goal>testCompile</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.scalatest</groupId> - <artifactId>scalatest-maven-plugin</artifactId> - <configuration> - <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> - <junitxml>.</junitxml> - <filereports>TestSuite.txt</filereports> - </configuration> - <executions> - <execution> - <id>test</id> - <goals> - <goal>test</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/EagleOutputCollector.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/EagleOutputCollector.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/EagleOutputCollector.java deleted file mode 100644 index 0c4b1ab..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/EagleOutputCollector.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.dataproc.core; - -import java.io.Serializable; - -/** - * expose simple interface for streaming executor to populate output data - * - */ -public interface EagleOutputCollector extends Serializable{ - void collect(ValuesArray t); -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/JsonSerDeserUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/JsonSerDeserUtils.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/JsonSerDeserUtils.java deleted file mode 100644 index 5df7e55..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/JsonSerDeserUtils.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.dataproc.core; - -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.ObjectMapper; - -public class JsonSerDeserUtils { - private static final Logger LOG = LoggerFactory.getLogger(JsonSerDeserUtils.class); - - public static <T> String serialize(T o) throws Exception{ - return serialize(o, null); - } - - public static <T> String serialize(T o, List<Module> modules) throws Exception { - ObjectMapper mapper = new ObjectMapper(); - if (modules != null) { - mapper.registerModules(modules); - } - return mapper.writeValueAsString(o); - } - - public static <T> T deserialize(String value, Class<T> cls) throws Exception{ - return deserialize(value, cls, null); - } - - public static <T> T deserialize(String value, Class<T> cls, List<Module> modules) throws Exception{ - ObjectMapper mapper = new ObjectMapper(); - if (modules != null) { - mapper.registerModules(modules); - } - return mapper.readValue(value, cls); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/StreamingProcessConstants.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/StreamingProcessConstants.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/StreamingProcessConstants.java deleted file mode 100644 index fc2a016..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/StreamingProcessConstants.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.dataproc.core; - -public class StreamingProcessConstants { - public static final String EVENT_PARTITION_KEY = "eventPartitionKey"; - public static final String EVENT_STREAM_NAME = "streamName"; - public static final String EVENT_ATTRIBUTE_MAP = "value"; -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/ValuesArray.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/ValuesArray.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/ValuesArray.java deleted file mode 100644 index 9971cb2..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/ValuesArray.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.dataproc.core; - -import java.util.ArrayList; - -/** - * multiple datapoints are stored within one ValuesArray object - * sent out - */ -public class ValuesArray extends ArrayList<Object>{ - private static final long serialVersionUID = -8218427810421668178L; - - public ValuesArray() { - - } - - public ValuesArray(Object... vals) { - super(vals.length); - for(Object o: vals) { - add(o); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/AbstractConfigOptionParser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/AbstractConfigOptionParser.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/AbstractConfigOptionParser.java deleted file mode 100644 index f40bb76..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/AbstractConfigOptionParser.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.dataproc.util; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.commons.cli.*; - -import java.util.Map; - -/** - * @since 8/22/15 - */ -public abstract class AbstractConfigOptionParser { - - // private final Options options; - private final Parser parser; - - public AbstractConfigOptionParser(){ - parser = parser(); - //options = options(); - } - - /** - * @return Parser - */ - protected abstract Parser parser(); - - /** - * @return Options - */ - protected abstract Options options(); - - public abstract Map<String,String> parseConfig(String[] arguments) throws ParseException; - - /** - * Load config as system properties - * - * @param arguments command line arguments - * @throws ParseException - */ - public Config load(String[] arguments) throws ParseException { - Map<String,String> configProps = parseConfig(arguments); - for(Map.Entry<String,String> entry:configProps.entrySet()){ - System.setProperty(entry.getKey(),entry.getValue()); - } - System.setProperty("config.trace", "loads"); - return ConfigFactory.load(); - } - - public CommandLine parse(String[] arguments) throws ParseException { - return this.parser.parse(this.options(),arguments); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/ConfigOptionParser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/ConfigOptionParser.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/ConfigOptionParser.java deleted file mode 100644 index bbd4e38..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/ConfigOptionParser.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.dataproc.util; - -import org.apache.commons.cli.*; - -import java.util.HashMap; -import java.util.Map; - - -public class ConfigOptionParser extends AbstractConfigOptionParser { - private final static String CONFIG_OPT_FLAG = "D"; - - @Override - protected Parser parser() { - return new BasicParser(); - } - - @Override - protected Options options() { - Options options = new Options(); - options.addOption(CONFIG_OPT_FLAG, true, "Config properties in format of \"-D key=value\""); - return options; - } - - @Override - public Map<String,String> parseConfig(String[] arguments) throws ParseException { - CommandLine cmd = parse(arguments); - return parseCommand(cmd); - } - - protected Map<String,String> parseCommand(CommandLine cmd) throws ParseException { - Map<String,String> result = new HashMap<>(); - if(cmd.hasOption(CONFIG_OPT_FLAG)){ - String[] values = cmd.getOptionValues(CONFIG_OPT_FLAG); - for(String value:values){ - int eqIndex = value.indexOf("="); - if(eqIndex>0 && eqIndex<value.length()){ - String k = value.substring(0,eqIndex); - String v = value.substring(eqIndex+1,value.length()); - if(result.containsKey(k)){ - throw new ParseException("Duplicated "+CONFIG_OPT_FLAG+" "+value); - }else{ - result.put(k,v); - } - }else{ - throw new ParseException("Invalid format: -"+CONFIG_OPT_FLAG+" "+value+", required: -"+CONFIG_OPT_FLAG+" key=value"); - } - } - } - return result; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaMapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaMapper.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaMapper.java deleted file mode 100644 index 7e66478..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaMapper.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.datastream; - -import java.util.List; - -public interface JavaMapper { - List<Object> map(List<Object> input); -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaTypeCompatible.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaTypeCompatible.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaTypeCompatible.java deleted file mode 100644 index 39ce6c2..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaTypeCompatible.java +++ /dev/null @@ -1,24 +0,0 @@ -package org.apache.eagle.datastream; -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * @since 12/8/15 - */ -public interface JavaTypeCompatible { - Class<?> getType(); -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java deleted file mode 100644 index 0b17775..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.partition; - -import java.io.Serializable; -import java.util.List; - -public interface DataDistributionDao extends Serializable { - - List<Weight> fetchDataDistribution(long startTime, long endTime) throws Exception; -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionAlgorithm.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionAlgorithm.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionAlgorithm.java deleted file mode 100644 index 0614388..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionAlgorithm.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.partition; - -import java.io.Serializable; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public interface PartitionAlgorithm extends Serializable { - Map<String, Integer> partition(List<Weight> weights, int k); -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategy.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategy.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategy.java deleted file mode 100644 index e431f28..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategy.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.partition; - -import java.io.Serializable; - -public interface PartitionStrategy extends Serializable { - - int balance(String key, int buckNum); -}