http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/FilterBoltWrapper.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/FilterBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/FilterBoltWrapper.scala deleted file mode 100644 index cf9b53f..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/FilterBoltWrapper.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.datastream - -import java.util - -import backtype.storm.task.{OutputCollector, TopologyContext} -import backtype.storm.topology.OutputFieldsDeclarer -import backtype.storm.topology.base.BaseRichBolt -import backtype.storm.tuple.{Fields, Tuple} -import org.slf4j.LoggerFactory - -case class FilterBoltWrapper[T](fn : T => Boolean) extends BaseRichBolt{ - val LOG = LoggerFactory.getLogger(FilterBoltWrapper.getClass) - var _collector : OutputCollector = null - - override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = { - _collector = collector - } - - override def execute(input : Tuple): Unit = { - input.getValue(0) match { - case v:T => - if(fn(v)){ - _collector.emit(input, input.getValues) - _collector.ack(input) - } - } - } - - override def declareOutputFields(declarer : OutputFieldsDeclarer): Unit ={ - declarer.declare(new Fields(OutputFieldNameConst.FIELD_PREFIX + "0")) - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/GraphPrinter.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/GraphPrinter.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/GraphPrinter.scala deleted file mode 100644 index 24afdbb..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/GraphPrinter.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package eagle.datastream - -import org.jgrapht.experimental.dag.DirectedAcyclicGraph -import org.slf4j.LoggerFactory -import scala.collection.JavaConversions._ - -object GraphPrinter { - private val LOG = LoggerFactory.getLogger(GraphPrinter.getClass) - def print(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]): Unit ={ - val iter = dag.iterator() - while(iter.hasNext) { - val current = iter.next() - dag.outgoingEdgesOf(current).foreach(edge => { - LOG.info(edge.from + "{" + edge.from.parallelism + "}" +" => " + edge.to + "{" + edge.to.parallelism + "}" + " with groupByFields " + edge.groupByFields) - }) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/JavaStormBoltWrapper.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/JavaStormBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/JavaStormBoltWrapper.scala deleted file mode 100644 index 13be3c7..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/JavaStormBoltWrapper.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.datastream - -import java.util - -import backtype.storm.task.{OutputCollector, TopologyContext} -import backtype.storm.topology.OutputFieldsDeclarer -import backtype.storm.topology.base.BaseRichBolt -import backtype.storm.tuple.{Fields, Tuple} -import org.slf4j.LoggerFactory - -import scala.collection.JavaConverters._ - -case class JavaStormBoltWrapper(worker : JavaStormStreamExecutor[EagleTuple]) extends BaseRichBolt{ - val LOG = LoggerFactory.getLogger(StormBoltWrapper.getClass) - var _collector : OutputCollector = null - - override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = { - _collector = collector - worker.init - } - - override def execute(input : Tuple): Unit ={ - worker.flatMap(input.getValues, new Collector[EagleTuple](){ - def collect(t: EagleTuple): Unit ={ - _collector.emit(input, t.getList.asJava) - } - }) - _collector.ack(input) - } - - override def declareOutputFields(declarer : OutputFieldsDeclarer): Unit ={ - val fields = worker.fields - LOG.info("output fields for worker " + worker + " : " + fields.toList) - declarer.declare(new Fields(fields:_*)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/MapBoltWrapper.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/MapBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/MapBoltWrapper.scala deleted file mode 100644 index 7407c32..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/MapBoltWrapper.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.datastream - -import java.util - -import backtype.storm.task.{OutputCollector, TopologyContext} -import backtype.storm.topology.OutputFieldsDeclarer -import backtype.storm.topology.base.BaseRichBolt -import backtype.storm.tuple.{Fields, Tuple} -import org.slf4j.LoggerFactory - -/** - * @since 9/29/15 - */ -case class MapBoltWrapper[T,R](num: Int, fn: T => R) extends BaseRichBolt { - val LOG = LoggerFactory.getLogger(FilterBoltWrapper.getClass) - var _collector : OutputCollector = null - - override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = { - var fields = new util.ArrayList[String]() - var i : Int = 0; - while(i < num){ - fields.add(OutputFieldNameConst.FIELD_PREFIX + i) - i += 1 - } - declarer.declare(new Fields(fields)) - } - - override def execute(input: Tuple): Unit = { - val size = input.size() - var values : AnyRef = null - size match { - case 1 => values = scala.Tuple1(input.getValue(0)) - case 2 => values = scala.Tuple2(input.getValue(0), input.getValue(1)) - case 3 => values = scala.Tuple3(input.getValue(0), input.getValue(1), input.getValue(2)) - case 4 => values = scala.Tuple4(input.getValue(0), input.getValue(1), input.getValue(2), input.getValue(3)) - case _ => throw new IllegalArgumentException - } - val output = fn(values.asInstanceOf[T]) - output match { - case scala.Tuple1(a) => _collector.emit(input, util.Arrays.asList(a.asInstanceOf[AnyRef])) - case scala.Tuple2(a, b) => _collector.emit(input, util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef])) - case scala.Tuple3(a, b, c) => _collector.emit(input, util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef], c.asInstanceOf[AnyRef])) - case scala.Tuple4(a, b, c, d) => _collector.emit(input, util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef], c.asInstanceOf[AnyRef], d.asInstanceOf[AnyRef])) - case a => _collector.emit(input, util.Arrays.asList(a.asInstanceOf[AnyRef])) - } - _collector.ack(input) - } - - override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = { - _collector = collector - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/NodeNameSelector.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/NodeNameSelector.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/NodeNameSelector.scala deleted file mode 100644 index f579881..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/NodeNameSelector.scala +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.datastream - -case class NodeNameSelector(producer : StreamProducer) { - def getName : String = { - producer.name match { - case null => producer.toString - case _ => producer.name - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/OutputFieldNameConst.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/OutputFieldNameConst.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/OutputFieldNameConst.scala deleted file mode 100644 index b00f149..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/OutputFieldNameConst.scala +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.datastream - -object OutputFieldNameConst { - val FIELD_PREFIX = "f" -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/SpoutProxy.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/SpoutProxy.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/SpoutProxy.scala deleted file mode 100644 index 7bed261..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/SpoutProxy.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.datastream - -import backtype.storm.spout.SpoutOutputCollector -import backtype.storm.task.TopologyContext -import backtype.storm.topology.OutputFieldsDeclarer -import backtype.storm.topology.base.BaseRichSpout -import backtype.storm.tuple.Fields - -/** - * Declare delegated BaseRichSpout with given field names - * - * @param delegate delegated BaseRichSpout - * @param outputFields given field names - */ -case class SpoutProxy(delegate: BaseRichSpout, outputFields: java.util.List[String]) extends BaseRichSpout{ - def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector) { - this.delegate.open(conf, context, collector) - } - - def nextTuple { - this.delegate.nextTuple - } - - override def ack(msgId: AnyRef) { - this.delegate.ack(msgId) - } - - override def fail(msgId: AnyRef) { - this.delegate.fail(msgId) - } - - override def deactivate { - this.delegate.deactivate - } - - override def declareOutputFields(declarer: OutputFieldsDeclarer) { - declarer.declare(new Fields(outputFields)) - } - - override def close { - this.delegate.close - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormBoltFactory.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormBoltFactory.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormBoltFactory.scala deleted file mode 100644 index c35b3e9..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormBoltFactory.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package eagle.datastream - -import backtype.storm.topology.base.BaseRichBolt -import com.typesafe.config.Config - -object StormBoltFactory { - def getBoltWrapper(graph: AbstractStreamProducerGraph, producer : StreamProducer, config : Config) : BaseRichBolt = { - producer match{ - case FlatMapProducer(id, worker) => { - if(worker.isInstanceOf[JavaStormStreamExecutor[EagleTuple]]){ - worker.asInstanceOf[JavaStormStreamExecutor[EagleTuple]].prepareConfig(config) - JavaStormBoltWrapper(worker.asInstanceOf[JavaStormStreamExecutor[EagleTuple]]) - }else if(worker.isInstanceOf[StormStreamExecutor[EagleTuple]]){ - worker.asInstanceOf[StormStreamExecutor[EagleTuple]].prepareConfig(config) - StormBoltWrapper(worker.asInstanceOf[StormStreamExecutor[EagleTuple]]) - }else { - throw new UnsupportedOperationException - } - } - case FilterProducer(id, fn) => { - FilterBoltWrapper(fn) - } - case MapProducer(id, n, fn) => { - MapBoltWrapper(n, fn) - } - case _ => throw new UnsupportedOperationException - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormBoltWrapper.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormBoltWrapper.scala deleted file mode 100644 index 5a953c9..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormBoltWrapper.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.datastream - -import java.util - -import backtype.storm.task.{OutputCollector, TopologyContext} -import backtype.storm.topology.OutputFieldsDeclarer -import backtype.storm.topology.base.BaseRichBolt -import backtype.storm.tuple.{Fields, Tuple} -import org.slf4j.LoggerFactory - -import scala.collection.JavaConverters._ - -case class StormBoltWrapper(worker : StormStreamExecutor[EagleTuple]) extends BaseRichBolt{ - val LOG = LoggerFactory.getLogger(StormBoltWrapper.getClass) - var _collector : OutputCollector = null - - override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = { - _collector = collector - worker.init - } - - override def execute(input : Tuple): Unit = { - try { - worker.flatMap(input.getValues.asScala, new Collector[EagleTuple] { - override def collect(t: EagleTuple): Unit = { - _collector.emit(input, t.getList.asJava) - } - }) - }catch{ - case ex: Exception => { - LOG.error("fail executing", ex) - _collector.fail(input) - throw new RuntimeException(ex) - } - } - _collector.ack(input) - } - - override def declareOutputFields(declarer : OutputFieldsDeclarer): Unit ={ - val fields = worker.fields - LOG.info("output fields for worker " + worker + " : " + fields.toList) - declarer.declare(new Fields(fields:_*)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormExecutorForAlertWrapper.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormExecutorForAlertWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormExecutorForAlertWrapper.scala deleted file mode 100644 index fd3e2e5..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormExecutorForAlertWrapper.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package eagle.datastream - -import java.util - -import com.typesafe.config.Config - -case class StormExecutorForAlertWrapper(delegate: StormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]], streamName: String) - extends StormStreamExecutor3[String, String, util.SortedMap[Object, Object]]{ - override def prepareConfig(config: Config): Unit = { - delegate.prepareConfig(config) - } - - override def init: Unit = { - delegate.init - } - - override def flatMap(input: Seq[AnyRef], collector: Collector[Tuple3[String, String, util.SortedMap[Object, Object]]]): Unit = { - delegate.flatMap(input, new Collector[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]] { - override def collect(r: Tuple2[String, util.SortedMap[AnyRef, AnyRef]]): Unit = { - collector.collect(Tuple3(r.f0, streamName, r.f1)) - } - }) - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormSpoutFactory.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormSpoutFactory.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormSpoutFactory.scala deleted file mode 100644 index be4ee81..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormSpoutFactory.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package eagle.datastream - -import java.util - -import backtype.storm.topology.base.BaseRichSpout -import com.typesafe.config.Config - -object StormSpoutFactory { - /** - * @param config context configuration - * @param sourceProducer source producer - * @return - */ - def createSpout(config: Config, sourceProducer: StormSourceProducer) : BaseRichSpout = { - val numFields = sourceProducer.numFields - if(numFields <= 0) { - sourceProducer.source - }else{ - var i = 0 - val ret = new util.ArrayList[String] - while(i < numFields){ - ret.add(OutputFieldNameConst.FIELD_PREFIX + i) - i += 1 - } - SpoutProxy(sourceProducer.source, ret) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormStreamDAG.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormStreamDAG.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormStreamDAG.scala deleted file mode 100644 index 8fe9695..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormStreamDAG.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package eagle.datastream - -import org.jgrapht.experimental.dag.DirectedAcyclicGraph - -import scala.collection.JavaConverters._ -import scala.collection.{JavaConversions, mutable} - -/** - * wrapper of DAG, used for storm topology compiler - */ -class StormStreamDAG(graph: DirectedAcyclicGraph[StreamProducer, StreamConnector]) extends AbstractStreamProducerGraph { - var nodeMap: mutable.Map[String, StreamProducer] = null - - override def addEdge(from: StreamProducer, to: StreamProducer, streamConnector: StreamConnector): Unit = { - graph.addEdge(from, to, streamConnector) - } - - override def addVertex(producer: StreamProducer): Unit = { - graph.addVertex(producer) - } - - override def iterator(): Iterator[StreamProducer] = { - JavaConversions.asScalaIterator(graph.iterator()) - } - - override def isSource(v: StreamProducer): Boolean = { - graph.inDegreeOf(v) match { - case 0 => true - case _ => false - } - } - - override def outgoingEdgesOf(v: StreamProducer): scala.collection.mutable.Set[StreamConnector] = { - JavaConversions.asScalaSet(graph.outgoingEdgesOf(v)) - } - - override def getNodeByName(name: String): Option[StreamProducer] = { - nodeMap.get(name) - } - - def setNodeMap(nodeMap: mutable.Map[String, StreamProducer]): Unit = { - this.nodeMap = nodeMap - } - - override def incomingVertexOf(v: StreamProducer): scala.collection.mutable.Set[StreamProducer] = { - val set = mutable.Set[StreamProducer]() - graph.incomingEdgesOf(v).asScala.foreach(e => set += graph.getEdgeSource(e)) - set - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormStreamDAGTransformer.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormStreamDAGTransformer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormStreamDAGTransformer.scala deleted file mode 100644 index 5966387..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormStreamDAGTransformer.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package eagle.datastream - -import org.jgrapht.experimental.dag.DirectedAcyclicGraph -import scala.collection.mutable - -/** - * convert generic DAG data structure to Storm specific DAG data structure for easy topology compiler - */ -object StormStreamDAGTransformer { - /** - * Transform DirectedAcyclicGraph[StreamProducer, StreamConnector] into StormStreamDAG - * - * @param dag DirectedAcyclicGraph[StreamProducer, StreamConnector] - * @return StormStreamDAG - */ - def transform(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]) : StormStreamDAG = { - val stormDAG = new StormStreamDAG(dag) - val nodeMap = mutable.HashMap[String, StreamProducer]() - val iter = dag.iterator() - while(iter.hasNext){ - val sp = iter.next() - nodeMap.put(sp.name, sp) - } - stormDAG.setNodeMap(nodeMap) - stormDAG - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormTopologyCompiler.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormTopologyCompiler.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormTopologyCompiler.scala deleted file mode 100644 index de6cc9f..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormTopologyCompiler.scala +++ /dev/null @@ -1,98 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package eagle.datastream - -import java.util - -import backtype.storm.topology.base.BaseRichBolt -import backtype.storm.topology.{BoltDeclarer, TopologyBuilder} -import backtype.storm.tuple.Fields -import com.typesafe.config.Config -import org.slf4j.LoggerFactory - -case class StormTopologyCompiler(config: Config, graph: AbstractStreamProducerGraph) extends AbstractTopologyCompiler{ - val LOG = LoggerFactory.getLogger(StormTopologyCompiler.getClass) - val boltCache = scala.collection.mutable.Map[StreamProducer, StormBoltWrapper]() - - override def buildTopology: AbstractTopologyExecutor ={ - val builder = new TopologyBuilder(); - val iter = graph.iterator() - val boltDeclarerCache = scala.collection.mutable.Map[String, BoltDeclarer]() - while(iter.hasNext){ - val from = iter.next() - val fromName = from.name - if(graph.isSource(from)){ - val spout = StormSpoutFactory.createSpout(config, from.asInstanceOf[StormSourceProducer]) - builder.setSpout(fromName, spout, from.parallelism) - LOG.info("Spout name : " + fromName + " with parallelism " + from.parallelism) - } else { - LOG.info("Bolt name:" + fromName) - } - - val edges = graph.outgoingEdgesOf(from) - edges.foreach(sc => { - val toName = sc.to.name - var boltDeclarer : BoltDeclarer = null - val toBolt = createBoltIfAbsent(toName) - boltDeclarerCache.get(toName) match{ - case None => { - var finalParallelism = 1 - graph.getNodeByName(toName) match { - case Some(p) => finalParallelism = p.parallelism - case None => finalParallelism = 1 - } - boltDeclarer = builder.setBolt(toName, toBolt, finalParallelism); - LOG.info("created bolt " + toName + " with parallelism " + finalParallelism) - boltDeclarerCache.put(toName, boltDeclarer) - } - case Some(bt) => boltDeclarer = bt - } - sc.groupByFields match{ - case Nil => boltDeclarer.shuffleGrouping(fromName) - case p => boltDeclarer.fieldsGrouping(fromName, new Fields(fields(p))) - } - LOG.info("bolt connected " + fromName + "->" + toName + " with groupby fields " + sc.groupByFields) - }) - } - new StormTopologyExecutorImpl(builder.createTopology, config) - } - - def fields(fields : Seq[Int]): java.util.List[String] ={ - val ret = new util.ArrayList[String] - fields.map(n => ret.add(OutputFieldNameConst.FIELD_PREFIX + n)) - ret - } - - def createBoltIfAbsent(name : String) : BaseRichBolt = { - val producer = graph.getNodeByName(name) - producer match{ - case Some(p) => createBoltIfAbsent(graph, p) - case None => throw new IllegalArgumentException("please check bolt name " + name) - } - } - - def createBoltIfAbsent(graph: AbstractStreamProducerGraph, producer : StreamProducer): BaseRichBolt ={ - boltCache.get(producer) match{ - case Some(bolt) => bolt - case None => { - StormBoltFactory.getBoltWrapper(graph, producer, config) - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormTopologyExecutorImpl.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormTopologyExecutorImpl.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormTopologyExecutorImpl.scala deleted file mode 100644 index 656495d..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StormTopologyExecutorImpl.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.datastream - -import backtype.storm.generated.StormTopology -import backtype.storm.utils.Utils -import backtype.storm.{Config, LocalCluster, StormSubmitter} -import storm.trident.spout.RichSpoutBatchExecutor - -case class StormTopologyExecutorImpl(topology: StormTopology, config: com.typesafe.config.Config) extends AbstractTopologyExecutor { - @throws(classOf[Exception]) - def execute { - val localMode: Boolean = config.getString("envContextConfig.mode").equalsIgnoreCase("local") - val conf: Config = new Config - conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024)) - conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8)) - conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, Int.box(32)) - conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384)) - conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384)) - - val topologyName = config.getString("envContextConfig.topologyName") - if (!localMode) { - StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology) - } - else { - val cluster: LocalCluster = new LocalCluster - cluster.submitTopology(topologyName, conf, topology) - Utils.sleep(Integer.MAX_VALUE) - cluster.killTopology(topologyName) - cluster.shutdown - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamAlertExpansion.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamAlertExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamAlertExpansion.scala deleted file mode 100644 index e6a4012..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamAlertExpansion.scala +++ /dev/null @@ -1,192 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package eagle.datastream - -import java.util - -import com.typesafe.config.Config -import eagle.alert.dao.AlertDefinitionDAOImpl -import eagle.executor.AlertExecutorCreationUtils -import org.jgrapht.experimental.dag.DirectedAcyclicGraph -import org.slf4j.LoggerFactory - -import scala.collection.JavaConversions._ -import scala.collection.mutable.ListBuffer - -/** - * The constraints for alert is: - * 1. only 3 StreamProducers can be put immediately before MapProducer, FlatMapProducer, StreamUnionProducer - * 2. For StreamUnionProducer, the only supported unioned producers are MapProducer and FlatMapProducer - * 3. the output for MapProducer and FlatMapProducer is 2-field tuple, key and value, key is string, value has to be SortedMap - * 4. the framework will wrapper original MapProducer and FlatMapProducer to emit 3-field tuple, {key, streamName and value} - * 5. the framework will automatically partition traffic with first field - * - * - * 2 steps - * step 1: wrapper previous StreamProducer with one more field "streamName" - * step 2: partition alert executor by policy partitioner class - */ - -class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(config) { - val LOG = LoggerFactory.getLogger(classOf[StreamAlertExpansion]) - - override def expand(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]): Unit ={ - val iter = dag.iterator() - val toBeAddedEdges = new ListBuffer[StreamConnector] - val toBeRemovedVertex = new ListBuffer[StreamProducer] - while(iter.hasNext) { - val current = iter.next() - dag.outgoingEdgesOf(current).foreach(edge => { - val child = edge.to - onIteration(toBeAddedEdges, toBeRemovedVertex, dag, current, child) - }) - } - // add back edges - toBeAddedEdges.foreach(e => { - dag.addVertex(e.from) - dag.addVertex(e.to) - dag.addEdge(e.from, e.to, e) - }) - toBeRemovedVertex.foreach(v => dag.removeVertex(v)) - } - - def onIteration(toBeAddedEdges: ListBuffer[StreamConnector], toBeRemovedVertex: ListBuffer[StreamProducer], - dag: DirectedAcyclicGraph[StreamProducer, StreamConnector], current: StreamProducer, child: StreamProducer): Unit = { - child match { - case AlertStreamSink(id, upStreamNames, alertExecutorId, withConsumer) => { - /** - * step 1: wrapper previous StreamProducer with one more field "streamName" - * for AlertStreamSink, we check previous StreamProducer and replace that - */ - val newStreamProducers = new ListBuffer[StreamProducer] - current match { - case StreamUnionProducer(id, others) => { - val incomingEdges = dag.incomingEdgesOf(current) - incomingEdges.foreach(e => newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, e.from, upStreamNames.get(0))) - var i: Int = 1 - others.foreach(o => { - newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, o, upStreamNames.get(i)) - i += 1 - }) - } - case _: FlatMapProducer[AnyRef, AnyRef] => { - newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0)) - } - case _: MapProducer => { - newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0)) - } - case s: StreamProducer if dag.inDegreeOf(s) == 0 => { - newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0)) - } - case p@_ => throw new IllegalStateException(s"$p can not be put before AlertStreamSink, only StreamUnionProducer,FlatMapProducer and MapProducer are supported") - } - - /** - * step 2: partition alert executor by policy partitioner class - */ - val alertExecutors = AlertExecutorCreationUtils.createAlertExecutors(config, new AlertDefinitionDAOImpl(config), upStreamNames, alertExecutorId) - var alertProducers = new scala.collection.mutable.MutableList[StreamProducer] - alertExecutors.foreach(exec => { - val t = FlatMapProducer(UniqueId.incrementAndGetId(), exec).withName(exec.getAlertExecutorId() + "_" + exec.getPartitionSeq()) - t.setConfig(config) - t.setGraph(dag) - alertProducers += t - newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp, t).groupBy(Seq(0))) - }) - - // remove AlertStreamSink - toBeRemovedVertex += child - - // add alert consumer if necessary - if (withConsumer) { - AlertExecutorConsumerUtils.setupAlertConsumers(toBeAddedEdges, alertProducers.toList) - } - } - case _ => - } - } - - private def replace(toBeAddedEdges: ListBuffer[StreamConnector], toBeRemovedVertex: ListBuffer[StreamProducer], - dag: DirectedAcyclicGraph[StreamProducer, StreamConnector], current: StreamProducer, upStreamName: String) : StreamProducer= { - var newsp: StreamProducer = null - current match { - case _: FlatMapProducer[AnyRef, AnyRef] => { - val mapper = current.asInstanceOf[FlatMapProducer[_, _]].mapper - mapper match { - case a: JavaStormStreamExecutor[EagleTuple] => { - val newmapper = new JavaStormExecutorForAlertWrapper(a.asInstanceOf[JavaStormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]]], upStreamName) - newsp = FlatMapProducer(UniqueId.incrementAndGetId(), newmapper) - newsp.setGraph(dag) - newsp.setConfig(config) - } - case b: StormStreamExecutor[EagleTuple] => { - val newmapper = StormExecutorForAlertWrapper(b.asInstanceOf[StormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]]], upStreamName) - newsp = FlatMapProducer(UniqueId.incrementAndGetId(), newmapper) - newsp.setGraph(dag) - newsp.setConfig(config) - } - case _ => throw new IllegalArgumentException - } - // remove old StreamProducer and replace that with new StreamProducer - val incomingEdges = dag.incomingEdgesOf(current) - incomingEdges.foreach(e => toBeAddedEdges += StreamConnector(e.from, newsp)) - val outgoingEdges = dag.outgoingEdgesOf(current) - outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp, e.to)) - toBeRemovedVertex += current - } - case _: MapProducer => { - val mapper = current.asInstanceOf[MapProducer].fn - val newfun: (AnyRef => AnyRef) = { - a => mapper(a) match { - case scala.Tuple2(x1, x2) => (x1, upStreamName, x2) - case _ => throw new IllegalArgumentException - } - } - current match { - case MapProducer(id, 2, fn) => newsp = MapProducer(UniqueId.incrementAndGetId(), 3, newfun) - case _ => throw new IllegalArgumentException - } - val incomingEdges = dag.incomingEdgesOf(current) - incomingEdges.foreach(e => toBeAddedEdges += StreamConnector(e.from, newsp)) - val outgoingEdges = dag.outgoingEdgesOf(current) - outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp, e.to)) - toBeRemovedVertex += current - } - case s: StreamProducer if dag.inDegreeOf(s) == 0 => { - val fn:(AnyRef => AnyRef) = { - n => { - n match { - case scala.Tuple3 => n - case scala.Tuple2(x1,x2) => (x1,upStreamName,x2) - case scala.Tuple1(x1) => (if(x1 == null) null else x1.hashCode(),upStreamName,x1) - case _ => (if(n == null) null else n.hashCode(),upStreamName,n) - } - } - } - newsp = MapProducer(UniqueId.incrementAndGetId(),3,fn) - toBeAddedEdges += StreamConnector(current,newsp) - val outgoingEdges = dag.outgoingEdgesOf(current) - outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp,e.to)) - } - case _ => throw new IllegalArgumentException("Only FlatMapProducer and MapProducer can be replaced before AlertStreamSink") - } - newsp - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamAppDSL.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamAppDSL.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamAppDSL.scala deleted file mode 100644 index 0a2e5ba..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamAppDSL.scala +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.datastream - -import backtype.storm.topology.base.BaseRichSpout -import com.typesafe.config._ -import eagle.dataproc.impl.storm.AbstractStormSpoutProvider -import eagle.dataproc.util.ConfigOptionParser - -import scala.reflect.runtime.universe._ - -/** - * @since 11/6/15 - */ -trait ConfigContext{ - def set(config:Config) - def config:Config - - def set[T<:AnyRef](key:String,value:T): Unit = { - set(config.withValue(key,ConfigValueFactory.fromAnyRef(value))) - } - - /** - * - * @param key config key - * @param default default value - * @tparam T return type - * @return - */ - def get[T](key:String,default:T=null)(implicit tag:TypeTag[T]):T = { - if(config.hasPath(key)) { - get(key) - } else default - } - - def get[T](key:String)(implicit tag: TypeTag[T]):T = tag.tpe match { - case STRING_TYPE => config.getString(key).asInstanceOf[T] - case TypeTag.Double => config.getDouble(key).asInstanceOf[T] - case TypeTag.Long => config.getLong(key).asInstanceOf[T] - case TypeTag.Int => config.getInt(key).asInstanceOf[T] - case TypeTag.Byte => config.getBytes(key).asInstanceOf[T] - case TypeTag.Boolean => config.getBoolean(key).asInstanceOf[T] - case NUMBER_TYPE => config.getNumber(key).asInstanceOf[T] - case OBJECT_TYPE => config.getObject(key).asInstanceOf[T] - case VALUE_TYPE => config.getValue(key).asInstanceOf[T] - case ANY_REF_TYPE => config.getAnyRef(key).asInstanceOf[T] - case INT_LIST_TYPE => config.getIntList(key).asInstanceOf[T] - case DOUBLE_LIST_TYPE => config.getDoubleList(key).asInstanceOf[T] - case BOOL_LIST_TYPE => config.getBooleanList(key).asInstanceOf[T] - case LONG_LIST_TYPE => config.getLongList(key).asInstanceOf[T] - case _ => throw new UnsupportedOperationException(s"$tag is not supported yet") - } - - val STRING_TYPE = typeOf[String] - val NUMBER_TYPE = typeOf[Number] - val INT_LIST_TYPE = typeOf[List[Int]] - val BOOL_LIST_TYPE = typeOf[List[Boolean]] - val DOUBLE_LIST_TYPE = typeOf[List[Double]] - val LONG_LIST_TYPE = typeOf[List[Double]] - val OBJECT_TYPE = typeOf[ConfigObject] - val VALUE_TYPE = typeOf[ConfigValue] - val ANY_REF_TYPE = typeOf[AnyRef] -} - -/** - * Stream APP DSL - * @tparam E - */ -trait StreamApp[+E<:ExecutionEnvironment] extends App with ConfigContext{ - private var _executed = false - private var _config:Config = null - - override def config:Config = _config - - override def set(config:Config) = _config = config - - def env:E - - def execute() { - env.execute() - _executed = true - } - - override def main(args: Array[String]): Unit = { - _config = new ConfigOptionParser().load(args) - super.main(args) - if(!_executed) execute() - } -} - -trait StormStreamApp extends StreamApp[StormExecutionEnvironment]{ - private var _env:StormExecutionEnvironment = null - def source(sourceProvider: AbstractStormSpoutProvider) = { - val spout = sourceProvider.getSpout(config) - env.newSource(spout) - } - - def source(spout:BaseRichSpout) = env.newSource(spout) - - override def env:StormExecutionEnvironment = { - if(_env == null){ - _env = ExecutionEnvironmentFactory.getStorm(config) - } - _env - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamConnector.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamConnector.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamConnector.scala deleted file mode 100644 index ceeb411..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamConnector.scala +++ /dev/null @@ -1,28 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package eagle.datastream - -case class StreamConnector(from: StreamProducer, to: StreamProducer) { - var groupByFields : Seq[Int] = Nil - - def groupBy(fields : Seq[Int]) : StreamConnector = { - groupByFields = fields - this - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamDAGExpansion.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamDAGExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamDAGExpansion.scala deleted file mode 100644 index baa514e..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamDAGExpansion.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package eagle.datastream - -import com.typesafe.config.Config -import org.jgrapht.experimental.dag.DirectedAcyclicGraph - -abstract class StreamDAGExpansion(config: Config) { - def expand(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamGroupbyExpansion.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamGroupbyExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamGroupbyExpansion.scala deleted file mode 100644 index 3bed891..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamGroupbyExpansion.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package eagle.datastream - -import com.typesafe.config.Config -import org.jgrapht.experimental.dag.DirectedAcyclicGraph -import org.slf4j.LoggerFactory -import scala.collection.JavaConversions._ -import scala.collection.mutable.ListBuffer - -/** - * Replace GroupByProducer(Vertex) with StreamConnector (Edge) - * @param config context configuration - */ -class StreamGroupbyExpansion(config: Config) extends StreamDAGExpansion(config){ - val LOG = LoggerFactory.getLogger(classOf[StreamGroupbyExpansion]) - - override def expand(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]) = { - val iter = dag.iterator() - var toBeAddedEdges = new ListBuffer[StreamConnector] - var toBeRemovedVertex = new ListBuffer[StreamProducer] - while(iter.hasNext) { - val current = iter.next() - dag.outgoingEdgesOf(current).foreach(edge => { - val child = edge.to - child match { - case p : GroupByProducer => { - dag.outgoingEdgesOf(p).foreach(c2 => { - toBeAddedEdges += StreamConnector(current, c2.to).groupBy(p.fields) - }) - toBeRemovedVertex += p - } - case _ => - } - }) - } - - // add back edges - toBeAddedEdges.foreach(e => dag.addEdge(e.from, e.to, e)) - toBeRemovedVertex.foreach(v => dag.removeVertex(v)) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamNameExpansion.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamNameExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamNameExpansion.scala deleted file mode 100644 index 40abfd8..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamNameExpansion.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package eagle.datastream - -import com.typesafe.config.Config -import org.jgrapht.experimental.dag.DirectedAcyclicGraph -import org.slf4j.LoggerFactory - -/** - * to set name for each StreamProducer - * 1. if name is given programatically, then use this name - * 2. otherwise use name generated by scala internally - */ -class StreamNameExpansion(config: Config) extends StreamDAGExpansion(config){ - val LOG = LoggerFactory.getLogger(classOf[StreamNameExpansion]) - - override def expand(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]) = { - val iter = dag.iterator() - while(iter.hasNext){ - val sp = iter.next() - sp.name = NodeNameSelector(sp).getName - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamParallelismConfigExpansion.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamParallelismConfigExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamParallelismConfigExpansion.scala deleted file mode 100644 index c3c4533..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamParallelismConfigExpansion.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package eagle.datastream - -import java.util.regex.Pattern - -import com.typesafe.config.{ConfigValue, ConfigObject, Config} -import org.jgrapht.experimental.dag.DirectedAcyclicGraph -import org.slf4j.LoggerFactory -import scala.collection.JavaConverters._ - -class StreamParallelismConfigExpansion(config: Config) extends StreamDAGExpansion(config){ - val LOG = LoggerFactory.getLogger(classOf[StreamParallelismConfigExpansion]) - - override def expand(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]) = { - val map = getParallelismMap(config) - val iter = dag.iterator() - while(iter.hasNext){ - val streamProducer = iter.next() - if(streamProducer.name != null) { - map.foreach(tuple => { - tuple._1.matcher(streamProducer.name).find() match { - case true => streamProducer.parallelism = tuple._2 - case false => - } - }) - } - } - } - - private def getParallelismMap(config: Config) : Map[Pattern, Int]= { - val parallelismConfig: ConfigObject = config.getObject("envContextConfig.parallelismConfig") - LOG.info("Found parallelismConfig ? " + (if (parallelismConfig == null) "no" else "yes")) - parallelismConfig.asScala.toMap map { - case (name, value) => (Pattern.compile(name), value.asInstanceOf[ConfigValue].unwrapped().asInstanceOf[Int]) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamProducer.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamProducer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamProducer.scala deleted file mode 100644 index 8485e28..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamProducer.scala +++ /dev/null @@ -1,193 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package eagle.datastream - -import java.util -import java.util.concurrent.atomic.AtomicInteger - -import backtype.storm.topology.base.BaseRichSpout -import com.typesafe.config.Config -import org.jgrapht.experimental.dag.DirectedAcyclicGraph -import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ - -/** - * StreamProducer is the base class for all other concrete StreamProducer - * It defines high level API for user to organize data stream flow - * - * StreamProducer is independent of execution environment - */ - -trait StreamProducer{ - var name: String = null - var parallelism: Int = 1 - var graph: DirectedAcyclicGraph[StreamProducer, StreamConnector] = null - var config: Config = null - - private def incrementAndGetId() = UniqueId.incrementAndGetId() - - def setGraph(graph: DirectedAcyclicGraph[StreamProducer, StreamConnector]): Unit = this.graph = graph - def getGraph: DirectedAcyclicGraph[StreamProducer, StreamConnector] = graph - def setConfig(config: Config) : Unit = this.config = config - def getConfig: Config = config - - def filter(fn : AnyRef => Boolean): StreamProducer ={ - val ret = FilterProducer(incrementAndGetId(), fn) - hookupDAG(graph, this, ret) - ret - } - - def flatMap[T, R](mapper : FlatMapper[T, R]) : StreamProducer = { - val ret = FlatMapProducer(incrementAndGetId(), mapper) - hookupDAG(graph, this, ret) - ret - } - - def map1(fn : AnyRef => AnyRef) : StreamProducer = { - val ret = MapProducer(incrementAndGetId(), 1, fn) - hookupDAG(graph, this, ret) - ret - } - - def map2(fn : AnyRef => AnyRef) : StreamProducer = { - val ret = MapProducer(incrementAndGetId(), 2, fn) - hookupDAG(graph, this, ret) - ret - } - - def map3(fn : AnyRef => AnyRef) : StreamProducer = { - val ret = MapProducer(incrementAndGetId(), 3, fn) - hookupDAG(graph, this, ret) - ret - } - - def map4(fn : AnyRef => AnyRef) : StreamProducer = { - val ret = MapProducer(incrementAndGetId(), 4, fn) - hookupDAG(graph, this, ret) - ret - } - - /** - * starting from 0, groupby operator would be upon edge of the graph - */ - def groupBy(fields : Int*) : StreamProducer = { - // validate each field index is greater or equal to 0 - fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should be always >= 0")) - val ret = GroupByProducer(incrementAndGetId(), fields) - hookupDAG(graph, this, ret) - ret - } - - //groupBy java version, starting from 1 - def groupBy(fields : java.util.List[Integer]) : StreamProducer = { - // validate each field index is greater or equal to 0 - fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should be always >= 0")) - val ret = GroupByProducer(incrementAndGetId(), fields.asScala.toSeq.asInstanceOf[Seq[Int]]) - hookupDAG(graph, this, ret) - ret - } - - def streamUnion(others : Seq[StreamProducer]) : StreamProducer = { - val ret = StreamUnionProducer(incrementAndGetId(), others) - hookupDAG(graph, this, ret) - ret - } - - /** - * alert is always sink of data flow - */ - def alertWithConsumer(upStreamNames: util.List[String], alertExecutorId : String) = { - alert(upStreamNames, alertExecutorId, true) - } - - def alertWithoutConsumer(upStreamNames: util.List[String], alertExecutorId : String) = { - alert(upStreamNames, alertExecutorId, false) - } - - def alert(upStreamNames: util.List[String], alertExecutorId : String, withConsumer: Boolean=true) = { - val ret = AlertStreamSink(incrementAndGetId(), upStreamNames, alertExecutorId, withConsumer) - hookupDAG(graph, this, ret) - } - - def alertWithConsumer(upStreamName: String, alertExecutorId : String): Unit ={ - alert(util.Arrays.asList(upStreamName), alertExecutorId, true) - } - - def alertWithoutConsumer(upStreamName: String, alertExecutorId : String): Unit ={ - alert(util.Arrays.asList(upStreamName), alertExecutorId, false) - } - - def hookupDAG(graph: DirectedAcyclicGraph[StreamProducer, StreamConnector], current: StreamProducer, next: StreamProducer) = { - current.getGraph.addVertex(next) - current.getGraph.addEdge(current, next, StreamConnector(current, next)) - passOnContext(current, next) - } - - private def passOnContext(current: StreamProducer, next: StreamProducer): Unit ={ - next.graph = current.graph - next.config = current.config - } - - /** - * can be set by programatically or by configuration - */ - def withParallelism(parallelism : Int) : StreamProducer = { - this.parallelism = parallelism - this - } - - def withName(name : String) : StreamProducer = { - this.name = name - this - } -} - -case class FilterProducer(id: Int, fn : AnyRef => Boolean) extends StreamProducer - -case class FlatMapProducer[T, R](id: Int, var mapper: FlatMapper[T, R]) extends StreamProducer { - override def toString() = mapper.toString + "_" + id -} - -case class MapProducer(id: Int, numOutputFields : Int, var fn : AnyRef => AnyRef) extends StreamProducer - -case class GroupByProducer(id: Int, fields : Seq[Int]) extends StreamProducer - -case class StreamUnionProducer(id: Int, others: Seq[StreamProducer]) extends StreamProducer - -case class StormSourceProducer(id: Int, source : BaseRichSpout) extends StreamProducer{ - var numFields : Int = 0 - /** - * rename outputfields to f0, f1, f2, ... - * if one spout declare some field names, those fields names will be modified - * @param n - */ - def renameOutputFields(n : Int): StormSourceProducer ={ - this.numFields = n - this - } -} - -case class AlertStreamSink(id: Int, upStreamNames: util.List[String], alertExecutorId : String, withConsumer: Boolean=true) extends StreamProducer - -object UniqueId{ - val id : AtomicInteger = new AtomicInteger(0); - def incrementAndGetId() : Int = { - id.incrementAndGet() - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamUnionExpansion.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamUnionExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamUnionExpansion.scala deleted file mode 100644 index b68d213..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/StreamUnionExpansion.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package eagle.datastream - -import com.typesafe.config.Config -import org.jgrapht.experimental.dag.DirectedAcyclicGraph -import org.slf4j.LoggerFactory -import scala.collection.JavaConversions._ -import scala.collection.mutable.ListBuffer - -/** - * union operator should be expanded - */ -class StreamUnionExpansion(config: Config) extends StreamDAGExpansion(config){ - val LOG = LoggerFactory.getLogger(classOf[StreamUnionExpansion]) - - override def expand(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]) = { - val iter = dag.iterator() - var toBeAddedEdges = new ListBuffer[StreamConnector] - var toBeRemovedVertex = new ListBuffer[StreamProducer] - while(iter.hasNext) { - val current = iter.next() - dag.outgoingEdgesOf(current).foreach(edge => { - val child = edge.to - val groupByFields = edge.groupByFields; - child match { - case StreamUnionProducer(id, others) => { - dag.outgoingEdgesOf(child).foreach(c2 => { - toBeAddedEdges += StreamConnector(current, c2.to).groupBy(groupByFields) - others.foreach(o => { - toBeAddedEdges += StreamConnector(o, c2.to).groupBy(groupByFields) - }) - }) - toBeRemovedVertex += child - } - case _ => - } - }) - } - - // add back edges - toBeAddedEdges.foreach(e => dag.addEdge(e.from, e.to, e)) - toBeRemovedVertex.foreach(v => dag.removeVertex(v)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/UnionUtils.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/UnionUtils.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/UnionUtils.scala deleted file mode 100644 index fe914f1..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/UnionUtils.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package eagle.datastream - -import java.util - -import scala.collection.JavaConverters._ - -object UnionUtils { - def join(producers : StreamProducer*) : StreamProducer = { - producers.head.streamUnion(producers.drop(1)) - } - - def join(producers : java.util.List[StreamProducer]) : StreamProducer = { - val newList = new util.ArrayList(producers) - val head = newList.get(0) - newList.remove(0) - head.streamUnion(newList.asScala); - } - - def join(producers : List[StreamProducer]) : StreamProducer = { - val head = producers.head - head.streamUnion(producers.tail); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/kafka/JsonMessageDeserializer.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/kafka/JsonMessageDeserializer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/kafka/JsonMessageDeserializer.scala deleted file mode 100644 index 1e735c5..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/kafka/JsonMessageDeserializer.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.datastream.kafka - -import java.io.IOException -import java.util -import java.util.Properties - -import com.fasterxml.jackson.databind.ObjectMapper -import eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer -import org.slf4j.{Logger, LoggerFactory} - -/** - * @since 11/6/15 - */ -case class JsonMessageDeserializer(props:Properties) extends SpoutKafkaMessageDeserializer{ - private val objectMapper: ObjectMapper = new ObjectMapper - private val LOG: Logger = LoggerFactory.getLogger(classOf[JsonMessageDeserializer]) - - override def deserialize(bytes: Array[Byte]): AnyRef = { - var map: util.Map[String, _] = null - try { - map = objectMapper.readValue(bytes, classOf[util.TreeMap[String, _]]) - } catch { - case e: IOException => { - LOG.error("Failed to deserialize json from: " + new String(bytes), e) - } - } - map - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/kafka/KafkaStreamMonitor.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/kafka/KafkaStreamMonitor.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/kafka/KafkaStreamMonitor.scala deleted file mode 100644 index bb8fb56..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/kafka/KafkaStreamMonitor.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.datastream.kafka - -import eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider -import eagle.datastream.StormStreamApp - -/** - * @since 11/6/15 - */ -class KafkaStreamMonitorApp extends StormStreamApp{ - val streamName = get[String]("eagle.stream.name","eventStream") - val streamExecutorId = get[String]("eagle.stream.executor",s"${streamName}Executor") - - set("dataSourceConfig.deserializerClass",classOf[JsonMessageDeserializer].getCanonicalName) - - source(new KafkaSourcedSpoutProvider).renameOutputFields(1).withName(streamName) - .alertWithConsumer(streamName, streamExecutorId) -} - -object KafkaStreamMonitor extends KafkaStreamMonitorApp \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractStreamProducerGraph.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractStreamProducerGraph.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractStreamProducerGraph.scala new file mode 100644 index 0000000..dc2c198 --- /dev/null +++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractStreamProducerGraph.scala @@ -0,0 +1,29 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ +package org.apache.eagle.datastream + +trait AbstractStreamProducerGraph { + def addEdge(from: StreamProducer, to: StreamProducer, streamConnector: StreamConnector) + def addVertex(producer: StreamProducer) + def iterator() : Iterator[StreamProducer] + def isSource(v : StreamProducer) : Boolean + def outgoingEdgesOf(v : StreamProducer) : scala.collection.mutable.Set[StreamConnector] + def getNodeByName(name : String) : Option[StreamProducer] + def incomingVertexOf(v: StreamProducer) : scala.collection.mutable.Set[StreamProducer] +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractTopologyCompiler.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractTopologyCompiler.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractTopologyCompiler.scala new file mode 100644 index 0000000..8c53ed5 --- /dev/null +++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractTopologyCompiler.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.datastream + +trait AbstractTopologyCompiler{ + def buildTopology : AbstractTopologyExecutor +}