http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusBase.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusBase.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusBase.java deleted file mode 100644 index e9c4a7c..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusBase.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.config; - -import java.io.Closeable; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.RetryNTimes; - - -/** - * Abstraction of asynchronized configuration management - * This is used for config change notification between processes, without this one process has to pull changes triggered by another process - * - * Config bus is similar to message bus, config change producer can publish config change(message) to config bus, - * while config change consumer can subscribe config change and do business logic in callback - * 1. use zookeeper as media to notify config consumer of config changes - * 2. each type of config is represented by topic - * 3. each config change can contain actual value or contain reference Id which consumer uses to retrieve actual value. This mechanism will reduce zookeeper overhed - * - */ -public class ConfigBusBase implements Closeable{ - protected String zkRoot; - protected CuratorFramework curator; - - public ConfigBusBase(ZKConfig config) { - this.zkRoot = config.zkRoot; - curator = CuratorFrameworkFactory.newClient( - config.zkQuorum, - config.zkSessionTimeoutMs, - config.connectionTimeoutMs, - new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval) - ); - curator.start(); - } - - @Override - public void close(){ - curator.close(); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusConsumer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusConsumer.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusConsumer.java deleted file mode 100644 index 9abfff5..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusConsumer.java +++ /dev/null @@ -1,57 +0,0 @@ - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.config; - -import org.apache.curator.framework.recipes.cache.NodeCache; -import org.slf4j.Logger; - -import com.fasterxml.jackson.databind.ObjectMapper; - -/** - * 1. When consumer is started, it always get notified of config - * 2. When config is changed, consumer always get notified of config change - * - * Reliability issue: - * TODO How to ensure config change message is always delivered to consumer - */ -public class ConfigBusConsumer extends ConfigBusBase { - private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(ConfigBusConsumer.class); - - private NodeCache cache; - public ConfigBusConsumer(ZKConfig config, String topic, ConfigChangeCallback callback){ - super(config); - String zkPath = zkRoot + "/" + topic; - LOG.info("monitor change for zkPath " + zkPath); - cache = new NodeCache(curator, zkPath); - cache.getListenable().addListener( () -> - { - // get node value and notify callback - byte[] value = curator.getData().forPath(zkPath); - ObjectMapper mapper = new ObjectMapper(); - ConfigValue v = mapper.readValue(value, ConfigValue.class); - callback.onNewConfig(v); - } - ); - try { - cache.start(); - }catch(Exception ex){ - LOG.error("error start NodeCache listener", ex); - throw new RuntimeException(ex); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusProducer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusProducer.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusProducer.java deleted file mode 100644 index 8dcbee7..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusProducer.java +++ /dev/null @@ -1,54 +0,0 @@ - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.config; - -import org.apache.zookeeper.CreateMode; -import org.slf4j.Logger; - -import com.fasterxml.jackson.databind.ObjectMapper; - -public class ConfigBusProducer extends ConfigBusBase { - private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(ConfigBusProducer.class); - - public ConfigBusProducer(ZKConfig config){ - super(config); - } - - /** - * @param topic - * @param config - */ - public void send(String topic, ConfigValue config){ - // check if topic exists, create this topic if not existing - String zkPath = zkRoot + "/" + topic; - try { - if (curator.checkExists().forPath(zkPath) == null) { - curator.create() - .creatingParentsIfNeeded() - .withMode(CreateMode.PERSISTENT) - .forPath(zkPath); - } - ObjectMapper mapper = new ObjectMapper(); - byte[] content = mapper.writeValueAsBytes(config); - curator.setData().forPath(zkPath, content); - }catch(Exception ex){ - LOG.error("error creating zkPath " + zkPath, ex); - throw new RuntimeException(ex); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigChangeCallback.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigChangeCallback.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigChangeCallback.java deleted file mode 100644 index f3e09d3..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigChangeCallback.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.config; - -public interface ConfigChangeCallback { - void onNewConfig(ConfigValue value); -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigValue.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigValue.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigValue.java deleted file mode 100644 index 9774296..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigValue.java +++ /dev/null @@ -1,50 +0,0 @@ - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.config; - - -/** - * Config body contains actual data for one topic - * this is serialized with json format into zookeeper - * value can be versionId which is used for referencing outside data - * or value can be actual config value - */ -public class ConfigValue { - private boolean isValueVersionId; - private Object value; - - public boolean isValueVersionId() { - return isValueVersionId; - } - - public void setValueVersionId(boolean valueVersionId) { - isValueVersionId = valueVersionId; - } - - public Object getValue() { - return value; - } - - public void setValue(Object value) { - this.value = value; - } - - public String toString(){ - return "isValueVersionId: " + isValueVersionId + ", value: " + value; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfig.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfig.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfig.java deleted file mode 100644 index d215d11..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfig.java +++ /dev/null @@ -1,34 +0,0 @@ - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.config; - -import java.io.Serializable; - -/** - * Memory representation of key zookeeper configurations - */ -public class ZKConfig implements Serializable{ - private static final long serialVersionUID = -1287231022807492775L; - - public String zkQuorum; - public String zkRoot; - public int zkSessionTimeoutMs; - public int connectionTimeoutMs; - public int zkRetryTimes; - public int zkRetryInterval; -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfigBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfigBuilder.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfigBuilder.java deleted file mode 100644 index ba009e6..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfigBuilder.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.config; - -import com.typesafe.config.Config; - -/** - * Since 4/28/16. - */ -public class ZKConfigBuilder { - public static ZKConfig getZKConfig(Config config){ - ZKConfig zkConfig = new ZKConfig(); - zkConfig.zkQuorum = config.getString("zkConfig.zkQuorum"); - zkConfig.zkRoot = config.getString("zkConfig.zkRoot"); - zkConfig.zkSessionTimeoutMs = config.getInt("zkConfig.zkSessionTimeoutMs"); - zkConfig.connectionTimeoutMs = config.getInt("zkConfig.connectionTimeoutMs"); - zkConfig.zkRetryTimes = config.getInt("zkConfig.zkRetryTimes"); - zkConfig.zkRetryInterval = config.getInt("zkConfig.zkRetryInterval"); - return zkConfig; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/AlertBoltSpec.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/AlertBoltSpec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/AlertBoltSpec.java deleted file mode 100644 index 83d307c..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/AlertBoltSpec.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordination.model; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; - -import com.fasterxml.jackson.annotation.JsonIgnore; - -/** - * The alert specification for topology bolts. - * - * @since Apr 29, 2016 - */ -public class AlertBoltSpec { - private String version; - private String topologyName; - - // mapping from boltId to list of PolicyDefinitions - @JsonIgnore - private Map<String, List<PolicyDefinition>> boltPoliciesMap = new HashMap<String, List<PolicyDefinition>>(); - - // mapping from boltId to list of PolicyDefinition's Ids - private Map<String, List<String>> boltPolicyIdsMap = new HashMap<String, List<String>>(); - - public AlertBoltSpec() { - } - - public AlertBoltSpec(String topo) { - this.topologyName = topo; - } - - public String getVersion() { - return version; - } - - public void setVersion(String version) { - this.version = version; - } - - public String getTopologyName() { - return topologyName; - } - - public void setTopologyName(String topologyName) { - this.topologyName = topologyName; - } - -// public List<PolicyDefinition> getBoltPolicy(String boltId) { -// return boltPoliciesMap.get(boltId); -// } -// -// public void addBoltPolicy(String boltId, PolicyDefinition pd) { -// if (boltPoliciesMap.containsKey(boltId)) { -// boltPoliciesMap.get(boltId).add(pd); -// } else { -// List<PolicyDefinition> list = new ArrayList<PolicyDefinition>(); -// boltPoliciesMap.put(boltId, list); -// list.add(pd); -// } -// } - - public void addBoltPolicy(String boltId, String policyName) { - if (boltPolicyIdsMap.containsKey(boltId)) { - boltPolicyIdsMap.get(boltId).add(policyName); - } else { - List<String> list = new ArrayList<String>(); - boltPolicyIdsMap.put(boltId, list); - list.add(policyName); - } - } - - @JsonIgnore - public Map<String, List<PolicyDefinition>> getBoltPoliciesMap() { - return boltPoliciesMap; - } - - @JsonIgnore - public void setBoltPoliciesMap(Map<String, List<PolicyDefinition>> boltPoliciesMap) { - this.boltPoliciesMap = boltPoliciesMap; - } - - public Map<String, List<String>> getBoltPolicyIdsMap() { - return boltPolicyIdsMap; - } - - public void setBoltPolicyIdsMap(Map<String, List<String>> boltPolicyIdsMap) { - this.boltPolicyIdsMap = boltPolicyIdsMap; - } - - @Override - public String toString() { - return String.format("version:%s-topo:%s, boltPolicyIdsMap %s", version, topologyName, boltPolicyIdsMap); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadata.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadata.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadata.java deleted file mode 100644 index 6c4f576..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadata.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordination.model; - -import java.util.Map; - -import org.apache.commons.lang3.builder.HashCodeBuilder; - -import com.google.common.base.Objects; - -/** - * @since Apr 5, 2016 - * this metadata model controls how to convert kafka topic into tuple stream - */ -public class Kafka2TupleMetadata { - private String type; - private String name; // data source name - private Map<String, String> properties; - private String topic; - private String schemeCls; - - private Tuple2StreamMetadata codec; - - public String getType() { - return type; - } - - public void setType(String type) { - this.type = type; - } - - public String getName() { - return name; - } - - public void setTopic(String topic) { - this.topic = topic; - } - - public void setSchemeCls(String schemeCls) { - this.schemeCls = schemeCls; - } - - public void setName(String name) { - this.name = name; - } - - public Map<String, String> getProperties() { - return properties; - } - - public void setProperties(Map<String, String> properties) { - this.properties = properties; - } - - public Tuple2StreamMetadata getCodec() { - return codec; - } - - public void setCodec(Tuple2StreamMetadata codec) { - this.codec = codec; - } - - public String getTopic() { - return this.topic; - } - public String getSchemeCls() { - return this.schemeCls; - } - - public int hashCode() { - return new HashCodeBuilder().append(name).append(type).build(); - } - - public boolean equals(Object obj) { - if (!(obj instanceof Kafka2TupleMetadata)) { - return false; - } - Kafka2TupleMetadata o = (Kafka2TupleMetadata) obj; - return Objects.equal(name, o.name) && Objects.equal(type, o.type); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueue.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueue.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueue.java deleted file mode 100644 index 43780f3..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueue.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordination.model; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.lang3.StringUtils; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -public class PolicyWorkerQueue { - - private StreamPartition partition; - private List<WorkSlot> workers; - - public PolicyWorkerQueue() { - workers = new ArrayList<>(); - } - - public PolicyWorkerQueue(List<WorkSlot> workers) { - this.workers = workers; - } - - public PolicyWorkerQueue(StreamPartition partition, List<WorkSlot> workers) { - this.workers = workers; - this.partition = partition; - } - - public StreamPartition getPartition() { - return partition; - } - - public void setPartition(StreamPartition partition) { - this.partition = partition; - } - - public List<WorkSlot> getWorkers() { - return workers; - } - - public void setWorkers(List<WorkSlot> workers) { - this.workers = workers; - } - - public String toString() { - return "[" + StringUtils.join(workers, ",") + "]"; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PublishSpec.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PublishSpec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PublishSpec.java deleted file mode 100644 index 06e819a..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PublishSpec.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordination.model; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.eagle.alert.engine.coordinator.Publishment; - -/** - * - * @since May 1, 2016 - * - */ -public class PublishSpec { - - private String topologyName; - // actually only publish spec for one topology - private String boltId; - private String version; - - private List<Publishment> publishments = new ArrayList<Publishment>(); - - public PublishSpec() { - } - - public PublishSpec(String topoName, String boltId) { - this.topologyName = topoName; - this.boltId = boltId; - } - - public void addPublishment(Publishment p) { - this.publishments.add(p); - } - - public String getTopologyName() { - return topologyName; - } - - public String getBoltId() { - return boltId; - } - - public List<Publishment> getPublishments() { - return publishments; - } - - public String getVersion() { - return version; - } - - public void setVersion(String version) { - this.version = version; - } - - public void setTopologyName(String topologyName) { - this.topologyName = topologyName; - } - - public void setBoltId(String boltId) { - this.boltId = boltId; - } - - public void setPublishments(List<Publishment> publishments) { - this.publishments = publishments; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java deleted file mode 100644 index 5241920..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordination.model; - -import java.util.ArrayList; -import java.util.List; - -import org.codehaus.jackson.annotate.JsonIgnore; - -/** - * @since Apr 29, 2016 - * - */ -public class RouterSpec { - private String version; - private String topologyName; - - private List<StreamRouterSpec> routerSpecs; - - public RouterSpec() { - routerSpecs = new ArrayList<StreamRouterSpec>(); - } - - public RouterSpec(String topoName) { - this(); - this.topologyName = topoName; - } - - public String getVersion() { - return version; - } - - public void setVersion(String version) { - this.version = version; - } - - public String getTopologyName() { - return topologyName; - } - - public void setTopologyName(String topologyName) { - this.topologyName = topologyName; - } - - @JsonIgnore - public void addRouterSpec(StreamRouterSpec routerSpec) { - routerSpecs.add(routerSpec); - } - - public List<StreamRouterSpec> getRouterSpecs() { - return routerSpecs; - } - - public void setRouterSpecs(List<StreamRouterSpec> routerSpecs) { - this.routerSpecs = routerSpecs; - } - - @Override - public String toString() { - return String.format("version:%s-topo:%s, boltSpec:%s", version, topologyName, routerSpecs); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/ScheduleState.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/ScheduleState.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/ScheduleState.java deleted file mode 100644 index 6036f29..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/ScheduleState.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordination.model; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.eagle.alert.coordination.model.internal.MonitoredStream; -import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; - -/** - * A global wise of schedule status <br/> - * <br/> - * TODO/FIXME: The persistence simply deserial ScheduleState to Json. One - * concern is that this string might become too big for store. <br/> - * <br/> - * The solution is in metadata resource, have specs/monitoredStreams/policy - * assignments stored in different table/collections with tage version. - * - * - * @since Apr 26, 2016 - * - */ -public class ScheduleState { - - // ScheduleSpec - private Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>(); - private Map<String, AlertBoltSpec> alertSpecs = new HashMap<String, AlertBoltSpec>(); - private Map<String, RouterSpec> groupSpecs = new HashMap<String, RouterSpec>(); - private Map<String, PublishSpec> publishSpecs = new HashMap<String, PublishSpec>(); - - // ScheduleSnapshot - private List<VersionedPolicyDefinition> policySnapshots = new ArrayList<VersionedPolicyDefinition>(); - private List<VersionedStreamDefinition> streamSnapshots = new ArrayList<VersionedStreamDefinition>(); - - // ScheduleResult - private List<MonitoredStream> monitoredStreams = new ArrayList<MonitoredStream>(); - private List<PolicyAssignment> assignments = new ArrayList<PolicyAssignment>(); - - private String version; - // FIXME : should be date, can not make it simple in mongo.. - private String generateTime; - private int code = 200; - private String message = "OK"; - - public ScheduleState() { - } - - public ScheduleState(String version, - Map<String, SpoutSpec> topoSpoutSpecsMap, - Map<String, RouterSpec> groupSpecsMap, - Map<String, AlertBoltSpec> alertSpecsMap, - Map<String, PublishSpec> pubMap, - Collection<PolicyAssignment> assignments, - Collection<MonitoredStream> monitoredStreams, - Collection<PolicyDefinition> definitions, - Collection<StreamDefinition> streams) { - this.spoutSpecs = topoSpoutSpecsMap; - this.groupSpecs = groupSpecsMap; - this.alertSpecs = alertSpecsMap; - this.publishSpecs = pubMap; - this.version = version; - this.generateTime = String.valueOf(new Date().getTime()); - this.assignments = new ArrayList<PolicyAssignment>(assignments); - this.monitoredStreams = new ArrayList<MonitoredStream>(monitoredStreams); - this.policySnapshots = new ArrayList<VersionedPolicyDefinition>(); - this.streamSnapshots = new ArrayList<VersionedStreamDefinition>(); - - for (SpoutSpec ss : this.spoutSpecs.values()) { - ss.setVersion(version); - } - - for (RouterSpec ss : this.groupSpecs.values()) { - ss.setVersion(version); - } - - for (AlertBoltSpec ss : this.alertSpecs.values()) { - ss.setVersion(version); - } - - for (PublishSpec ps : this.publishSpecs.values()) { - ps.setVersion(version); - } - - for (MonitoredStream ms : this.monitoredStreams) { - ms.setVersion(version); - } - for (PolicyAssignment ps : this.assignments) { - ps.setVersion(version); - } - for (PolicyDefinition def : definitions) { - this.policySnapshots.add(new VersionedPolicyDefinition(version, def)); - } - for (StreamDefinition sd :streams) { - this.streamSnapshots.add(new VersionedStreamDefinition(version, sd)); - } - } - - public Map<String, SpoutSpec> getSpoutSpecs() { - return spoutSpecs; - } - - public void setSpoutSpecs(Map<String, SpoutSpec> spoutSpecs) { - this.spoutSpecs = spoutSpecs; - } - - public Map<String, AlertBoltSpec> getAlertSpecs() { - return alertSpecs; - } - - public void setAlertSpecs(Map<String, AlertBoltSpec> alertSpecs) { - this.alertSpecs = alertSpecs; - } - - public Map<String, RouterSpec> getGroupSpecs() { - return groupSpecs; - } - - public void setGroupSpecs(Map<String, RouterSpec> groupSpecs) { - this.groupSpecs = groupSpecs; - } - - public Map<String, PublishSpec> getPublishSpecs() { - return publishSpecs; - } - - public void setPublishSpecs(Map<String, PublishSpec> publishSpecs) { - this.publishSpecs = publishSpecs; - } - - public String getVersion() { - return version; - } - - public void setVersion(String version) { - this.version = version; - } - - public int getCode() { - return code; - } - - public void setCode(int code) { - this.code = code; - } - - public String getMessage() { - return message; - } - - public void setMessage(String message) { - this.message = message; - } - - public String getGenerateTime() { - return generateTime; - } - - public void setGenerateTime(String generateTime) { - this.generateTime = generateTime; - } - - public List<MonitoredStream> getMonitoredStreams() { - return monitoredStreams; - } - - public List<PolicyAssignment> getAssignments() { - return assignments; - } - - public List<VersionedPolicyDefinition> getPolicySnapshots() { - return policySnapshots; - } - - public void setPolicySnapshots(List<VersionedPolicyDefinition> policySnapshots) { - this.policySnapshots = policySnapshots; - } - - public void setMonitoredStreams(List<MonitoredStream> monitoredStreams) { - this.monitoredStreams = monitoredStreams; - } - - public void setAssignments(List<PolicyAssignment> assignments) { - this.assignments = assignments; - } - - public List<VersionedStreamDefinition> getStreamSnapshots() { - return streamSnapshots; - } - - public void setStreamSnapshots(List<VersionedStreamDefinition> streamSnapshots) { - this.streamSnapshots = streamSnapshots; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/SpoutSpec.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/SpoutSpec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/SpoutSpec.java deleted file mode 100644 index a197858..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/SpoutSpec.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordination.model; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; - - -/** - * SpoutSpec metadata control 3 phases for data transformation for one specific topic - * phase 1: kafka topic to tuple, controlled by Kafka2TupleMetadata, i.e. Scheme - * phase 2: tuple to stream, controlled by Tuple2StreamMetadata, i.e. stream name selector etc. - * phase 3: stream repartition, controlled by StreamRepartitionMetadata, i.e. groupby spec - * @since Apr 18, 2016 - * - */ -public class SpoutSpec { - private String version; - -// private String spoutId; - private String topologyId; - - // topicName -> kafka2TupleMetadata - private Map<String, Kafka2TupleMetadata> kafka2TupleMetadataMap = new HashMap<String, Kafka2TupleMetadata>(); - // topicName -> Tuple2StreamMetadata - private Map<String, Tuple2StreamMetadata> tuple2StreamMetadataMap = new HashMap<String, Tuple2StreamMetadata>(); - // topicName -> list of StreamRepartitionMetadata, here it is list because one topic(data source) may spawn multiple streams. - private Map<String, List<StreamRepartitionMetadata>> streamRepartitionMetadataMap = new HashMap<String, List<StreamRepartitionMetadata>>(); - - public SpoutSpec(){} - - public SpoutSpec( - String topologyId, -// String spoutId, - Map<String, List<StreamRepartitionMetadata>> streamRepartitionMetadataMap, - Map<String, Tuple2StreamMetadata> tuple2StreamMetadataMap, - Map<String, Kafka2TupleMetadata> kafka2TupleMetadataMap) { - this.topologyId = topologyId; -// this.spoutId = spoutId; - this.streamRepartitionMetadataMap = streamRepartitionMetadataMap; - this.tuple2StreamMetadataMap = tuple2StreamMetadataMap; - this.kafka2TupleMetadataMap = kafka2TupleMetadataMap; - } - -// public String getSpoutId() { -// return spoutId; -// } -// public void setSpoutId(String spoutId) { -// this.spoutId = spoutId; -// } - - public String getTopologyId() { - return topologyId; - } - - public Map<String, List<StreamRepartitionMetadata>> getStreamRepartitionMetadataMap() { - return streamRepartitionMetadataMap; - } - - public Map<String, Tuple2StreamMetadata> getTuple2StreamMetadataMap(){ - return this.tuple2StreamMetadataMap; - } - - public Map<String, Kafka2TupleMetadata> getKafka2TupleMetadataMap() { - return kafka2TupleMetadataMap; - } - - @org.codehaus.jackson.annotate.JsonIgnore - public StreamRepartitionMetadata getStream(String streamName) { - for (List<StreamRepartitionMetadata> meta : this.streamRepartitionMetadataMap.values()) { - Optional<StreamRepartitionMetadata> m = meta.stream().filter((t) -> t.getStreamId().equalsIgnoreCase(streamName)).findFirst(); - if (m.isPresent()) { - return m.get(); - } - } - return null; - } - - public void setTopologyId(String topologyId) { - this.topologyId = topologyId; - } - - public void setKafka2TupleMetadataMap(Map<String, Kafka2TupleMetadata> kafka2TupleMetadataMap) { - this.kafka2TupleMetadataMap = kafka2TupleMetadataMap; - } - - public void setTuple2StreamMetadataMap(Map<String, Tuple2StreamMetadata> tuple2StreamMetadataMap) { - this.tuple2StreamMetadataMap = tuple2StreamMetadataMap; - } - - public void setStreamRepartitionMetadataMap(Map<String, List<StreamRepartitionMetadata>> streamRepartitionMetadataMap) { - this.streamRepartitionMetadataMap = streamRepartitionMetadataMap; - } - - public String getVersion() { - return version; - } - - public void setVersion(String version) { - this.version = version; - } - - @Override - public String toString() { - return String.format("version:%s-topo:%s ", version, this.topologyId); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamNameSelector.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamNameSelector.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamNameSelector.java deleted file mode 100644 index bc7952c..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamNameSelector.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordination.model; - -import java.util.Map; - -/** - * This metadata controls how to figure out stream name from incoming tuple - */ -public interface StreamNameSelector { - /** - * field name to value mapping - * @param tuple - * @return - */ - String getStreamName(Map<String, Object> tuple); -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionMetadata.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionMetadata.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionMetadata.java deleted file mode 100644 index dac04e8..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionMetadata.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordination.model; - -import java.util.ArrayList; -import java.util.List; - -import org.codehaus.jackson.annotate.JsonIgnore; - -/** - * @since Apr 25, 2016 - * This meta-data controls how tuple streamId is repartitioned - */ -public class StreamRepartitionMetadata { - private String topicName; - private String streamId; - /** - * each stream may have multiple different grouping strategies,for example groupby some fields or even shuffling - */ - public List<StreamRepartitionStrategy> groupingStrategies = new ArrayList<StreamRepartitionStrategy>(); - - public StreamRepartitionMetadata(){} - - public StreamRepartitionMetadata(String topicName, String stream) { - this.topicName = topicName; - this.streamId = stream; - } - - public String getStreamId() { - return streamId; - } - public void setStreamId(String streamId) { - this.streamId = streamId; - } - - public String getTopicName() { - return topicName; - } - public void setTopicName(String topicName) { - this.topicName = topicName; - } - - public List<StreamRepartitionStrategy> getGroupingStrategies() { - return groupingStrategies; - } - - @JsonIgnore - public void addGroupStrategy(StreamRepartitionStrategy gs) { - this.groupingStrategies.add(gs); - } - - public void setGroupingStrategies(List<StreamRepartitionStrategy> groupingStrategies) { - this.groupingStrategies = groupingStrategies; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategy.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategy.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategy.java deleted file mode 100644 index 203114e..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategy.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordination.model; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; - -public class StreamRepartitionStrategy { - public StreamPartition partition ; - - public int numTotalParticipatingRouterBolts = 0; // how many group-by bolts participate policy evaluation - public int startSequence = 0; // what is the sequence for the first bolt in this topology among all bolts - public List<String> totalTargetBoltIds = new ArrayList<String>(); - - public int hashCode() { - int hashcode = 1 * 31; - hashcode += partition.hashCode(); - for (String str : totalTargetBoltIds) { - hashcode += str.hashCode(); - } - return hashcode; - } - - public boolean equals(Object obj) { - if (!(obj instanceof StreamRepartitionStrategy)) { - return false; - } - StreamRepartitionStrategy o = (StreamRepartitionStrategy) obj; - return partition.equals(o.partition) - && CollectionUtils.isEqualCollection(totalTargetBoltIds, o.totalTargetBoltIds); - } - - public StreamPartition getPartition() { - return partition; - } - - public void setPartition(StreamPartition partition) { - this.partition = partition; - } - - public int getNumTotalParticipatingRouterBolts() { - return numTotalParticipatingRouterBolts; - } - - public void setNumTotalParticipatingRouterBolts(int numTotalParticipatingRouterBolts) { - this.numTotalParticipatingRouterBolts = numTotalParticipatingRouterBolts; - } - - public int getStartSequence() { - return startSequence; - } - - public void setStartSequence(int startSequence) { - this.startSequence = startSequence; - } - - public List<String> getTotalTargetBoltIds() { - return totalTargetBoltIds; - } - - public void setTotalTargetBoltIds(List<String> totalTargetBoltIds) { - this.totalTargetBoltIds = totalTargetBoltIds; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRouterSpec.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRouterSpec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRouterSpec.java deleted file mode 100644 index 773ae56..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRouterSpec.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordination.model; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; - -/** - * One RouteSpec means one rule mapping [streamId -> StreamPartition -> - * PolicyExecutionQueue] - * - * Key is StreamPartition - */ -public class StreamRouterSpec { - private String streamId; - private StreamPartition partition; // The meta-data to build - // StreamPartitioner - private List<PolicyWorkerQueue> targetQueue = new ArrayList<PolicyWorkerQueue>(); - - public StreamPartition getPartition() { - return partition; - } - - public void setPartition(StreamPartition partition) { - this.partition = partition; - } - - @Override - public int hashCode() { - return new HashCodeBuilder().append(this.streamId).append(this.partition).append(targetQueue).build(); - } - - public List<PolicyWorkerQueue> getTargetQueue() { - return targetQueue; - } - - public void addQueue(PolicyWorkerQueue queue) { - this.targetQueue.add(queue); - } - - public String getStreamId() { - return streamId; - } - - public void setStreamId(String streamId) { - this.streamId = streamId; - } - - public void setTargetQueue(List<PolicyWorkerQueue> targetQueue) { - this.targetQueue = targetQueue; - } - - @Override - public String toString() { - return String.format("StreamRouterSpec[streamId=%s,partition=%s, queue=[%s]]", this.getStreamId(), - this.getPartition(), this.getTargetQueue()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java deleted file mode 100644 index dc9d1ba..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordination.model; - -import java.text.SimpleDateFormat; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Convert incoming tuple to stream - * incoming tuple consists of 2 fields, topic and map of key/value - * output stream consists of 3 fields, stream name, timestamp, and map of key/value - */ -public class Tuple2StreamConverter { - private static final Logger LOG = LoggerFactory.getLogger(Tuple2StreamConverter.class); - private Tuple2StreamMetadata metadata; - private StreamNameSelector cachedSelector; - public Tuple2StreamConverter(Tuple2StreamMetadata metadata){ - this.metadata = metadata; - try { - cachedSelector = (StreamNameSelector)Class.forName(metadata.getStreamNameSelectorCls()). - getConstructor(Properties.class). - newInstance(metadata.getStreamNameSelectorProp()); - }catch(Exception ex){ - LOG.error("error initializing StreamNameSelector object", ex); - throw new IllegalStateException(ex); - } - } - - /** - * Assume tuple is composed of topic + map of key/value - * @param tuple - * @return - */ - @SuppressWarnings({ "unchecked" }) - public List<Object> convert(List<Object> tuple){ - Map<String, Object> m = (Map<String, Object>)tuple.get(1); - String streamName = cachedSelector.getStreamName(m); - if(!metadata.getActiveStreamNames().contains(streamName)) { - if(LOG.isDebugEnabled()) { - LOG.debug("streamName {} is not within activeStreamNames {}", streamName, metadata.getActiveStreamNames()); - } - return null; - } - - Object timeObject = m.get(metadata.getTimestampColumn()); - long timestamp = 0L; - if(timeObject instanceof Number){ - timestamp = ((Number) timeObject).longValue(); - }else{ - String timestampFieldValue = (String) m.get(metadata.getTimestampColumn()); - try { - SimpleDateFormat sdf = new SimpleDateFormat(metadata.getTimestampFormat()); - timestamp = sdf.parse(timestampFieldValue).getTime(); - } catch (Exception ex) { - LOG.error("continue with current timestamp because error happens while parsing timestamp column " + metadata.getTimestampColumn() + " with format " + metadata.getTimestampFormat()); - timestamp = System.currentTimeMillis(); - } - } - return Arrays.asList(tuple.get(0), streamName, timestamp, tuple.get(1)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java deleted file mode 100644 index bde4fe3..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordination.model; - -import java.util.HashSet; -import java.util.Properties; -import java.util.Set; - -/** - * @Since 4/25/16. This metadata controls how tuple is transformed to stream for - * example raw data consists of {"metric" : "cpuUsage", "host" : - * "xyz.com", "timestamp" : 1346846400, "value" : "0.9"} field "metric" - * is used for creating stream name, here "cpuUsage" is stream name - * - * metric could be "cpuUsage", "diskUsage", "memUsage" etc, so - * activeStreamNames are subset of all metric names - * - * All other messages which are not one of activeStreamNames will be - * filtered out - */ -public class Tuple2StreamMetadata { - /** - * only messages belonging to activeStreamNames will be kept while - * transforming tuple into stream - */ - private Set<String> activeStreamNames = new HashSet<String>(); - // the specific stream name selector - private Properties streamNameSelectorProp; - private String streamNameSelectorCls; - private String timestampColumn; - private String timestampFormat; - - public Set<String> getActiveStreamNames() { - return activeStreamNames; - } - - public void setActiveStreamNames(Set<String> activeStreamNames) { - this.activeStreamNames = activeStreamNames; - } - - public Properties getStreamNameSelectorProp() { - return streamNameSelectorProp; - } - - public void setStreamNameSelectorProp(Properties streamNameSelectorProp) { - this.streamNameSelectorProp = streamNameSelectorProp; - } - - public String getStreamNameSelectorCls() { - return streamNameSelectorCls; - } - - public void setStreamNameSelectorCls(String streamNameSelectorCls) { - this.streamNameSelectorCls = streamNameSelectorCls; - } - - public String getTimestampColumn() { - return timestampColumn; - } - - public void setTimestampColumn(String timestampColumn) { - this.timestampColumn = timestampColumn; - } - - public String getTimestampFormat() { - return timestampFormat; - } - - public void setTimestampFormat(String timestampFormat) { - this.timestampFormat = timestampFormat; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java deleted file mode 100644 index f4b8ccb..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordination.model; - -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; - -/** - * @since May 25, 2016 - * - */ -public class VersionedPolicyDefinition { - private String version; - private PolicyDefinition definition; - - public VersionedPolicyDefinition() { - } - - public VersionedPolicyDefinition(String version, PolicyDefinition def) { - this.version = version; - this.definition = def; - } - - public String getVersion() { - return version; - } - - public void setVersion(String version) { - this.version = version; - } - - public PolicyDefinition getDefinition() { - return definition; - } - - public void setDefinition(PolicyDefinition definition) { - this.definition = definition; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java deleted file mode 100644 index 2770aa1..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordination.model; - -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; - -/** - * @since May 25, 2016 - * - */ -public class VersionedStreamDefinition { - private String version; - private StreamDefinition definition; - - public VersionedStreamDefinition() { - } - - public VersionedStreamDefinition(String version, StreamDefinition def) { - this.version = version; - this.definition = def; - } - - public String getVersion() { - return version; - } - - public void setVersion(String version) { - this.version = version; - } - - public StreamDefinition getDefinition() { - return definition; - } - - public void setDefinition(StreamDefinition definition) { - this.definition = definition; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java deleted file mode 100644 index 96016f4..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordination.model; - - -/** - * A slot is simply a bolt. - */ -public class WorkSlot { - public String topologyName; - public String boltId; - - public String getTopologyName() { - return topologyName; - } - - public void setTopologyName(String topologyName) { - this.topologyName = topologyName; - } - - public String getBoltId() { - return boltId; - } - - public void setBoltId(String boltId) { - this.boltId = boltId; - } - - public WorkSlot() { - - } - - public WorkSlot(String topo, String boltId) { - this.topologyName = topo; - this.boltId = boltId; - } - - public String toString() { - return "(" + topologyName + ":" + boltId + ")"; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java deleted file mode 100644 index beda896..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordination.model.internal; - -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; - -import org.apache.commons.lang3.builder.HashCodeBuilder; - -/** - * A monitored stream is the unique data set in the system. - * - * It's a combination of stream and the specific grp-by on it. - * - * For correlation stream, it means multiple stream for a given monitored stream. - * - * - * @since Apr 27, 2016 - * - */ -public class MonitoredStream { - - private String version; - - // the stream group that this monitored stream stands for - private StreamGroup streamGroup = new StreamGroup(); - private List<StreamWorkSlotQueue> queues = new ArrayList<StreamWorkSlotQueue>(); - - public MonitoredStream() { - } - - public MonitoredStream(StreamGroup par) { - this.streamGroup = par; - } - - public StreamGroup getStreamGroup() { - return streamGroup; - } - - public List<StreamWorkSlotQueue> getQueues() { - return queues; - } - - public synchronized void addQueues(StreamWorkSlotQueue queue) { - queues.add(queue); - } - - public synchronized boolean removeQueue(StreamWorkSlotQueue queue) { - return this.queues.remove(queue); - } - - public int hashCode() { - return new HashCodeBuilder().append(streamGroup).build(); - } - - public boolean equals(Object other) { - if (!(other instanceof MonitoredStream)) { - return false; - } - MonitoredStream o = (MonitoredStream) other; - return Objects.equals(streamGroup, o.streamGroup); - } - - public String getVersion() { - return version; - } - - public void setVersion(String version) { - this.version = version; - } - - public void setQueues(List<StreamWorkSlotQueue> queues) { - this.queues = queues; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java deleted file mode 100644 index 3e956ca..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordination.model.internal; - -/** - * monitor metadata - * - * @since Apr 27, 2016 - * - */ -public class PolicyAssignment { - - private String version; - - private String policyName; - private String queueId; - - public PolicyAssignment() { - } - - public PolicyAssignment(String policyName, String queueId) { - this.policyName = policyName; - this.queueId = queueId; - } - - public String getPolicyName() { - return policyName; - } - - public String getQueueId() { - return queueId; - } - - public void setPolicyName(String policyName) { - this.policyName = policyName; - } - - public void setQueueId(String queueId) { - this.queueId = queueId; - } - - public String getVersion() { - return version; - } - - public void setVersion(String version) { - this.version = version; - } - - @Override - public String toString() { - return String.format("PolicyAssignment of policy %s, queueId %s, version %s !", policyName, queueId, version); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java deleted file mode 100644 index 9cbd841..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordination.model.internal; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; - -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.google.common.base.Objects; - -/** - * @since May 6, 2016 - * - */ -public class StreamGroup { - - private List<StreamPartition> streamPartitions = new ArrayList<StreamPartition>(); - - public StreamGroup() { - } - - public List<StreamPartition> getStreamPartitions() { - return streamPartitions; - } - - public void addStreamPartition(StreamPartition sp) { - this.streamPartitions.add(sp); - } - - public void addStreamPartitions(List<StreamPartition> sps) { - this.streamPartitions.addAll(sps); - } - - @org.codehaus.jackson.annotate.JsonIgnore - @JsonIgnore - public String getStreamId() { - StringBuilder sb = new StringBuilder("SG["); - for (StreamPartition sp : streamPartitions) { - sb.append(sp.getStreamId()).append("-"); - } - sb.append("]"); - return sb.toString(); - } - - @Override - public int hashCode() { - // implicitly all groups in stream groups will be built for hash code - return new HashCodeBuilder().append(streamPartitions).build(); - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof StreamGroup)) { - return false; - } - StreamGroup o = (StreamGroup) obj; - return Objects.equal(this.streamPartitions, o.streamPartitions); - } - - @Override - public String toString() { - return String.format("StreamGroup partitions=: %s ", streamPartitions); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java deleted file mode 100644 index fab6217..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordination.model.internal; - -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import org.apache.eagle.alert.coordination.model.WorkSlot; - -import com.fasterxml.jackson.annotation.JsonIgnore; - -/** - * A work queue for given one monitored stream. - * - * Analog to storm's "tasks for given bolt". - * - * @since Apr 27, 2016 - * - */ -public class StreamWorkSlotQueue { - private String queueId; - - private final List<WorkSlot> workingSlots = new LinkedList<WorkSlot>(); - private boolean dedicated; - // some dedicated option, like dedicated userId/tenantId/policyId. - private Map<String, Object> dedicateOption; - - private int numberOfGroupBolts; - private Map<String, Integer> topoGroupStartIndex = new HashMap<String, Integer>(); - - public StreamWorkSlotQueue() { - } - - public StreamWorkSlotQueue(StreamGroup par, boolean isDedicated, Map<String, Object> options, - List<WorkSlot> slots) { - this.queueId = par.getStreamId() + System.currentTimeMillis();// simply generate a queue - this.dedicated = isDedicated; - dedicateOption = new HashMap<String, Object>(); - dedicateOption.putAll(options); - this.workingSlots.addAll(slots); - } - - public Map<String, Object> getDedicateOption() { - return dedicateOption; - } - - public void setDedicateOption(Map<String, Object> dedicateOption) { - this.dedicateOption = dedicateOption; - } - - public List<WorkSlot> getWorkingSlots() { - return workingSlots; - } - - public boolean isDedicated() { - return dedicated; - } - - public void setDedicated(boolean dedicated) { - this.dedicated = dedicated; - } - - @org.codehaus.jackson.annotate.JsonIgnore - @JsonIgnore - public int getQueueSize() { - return workingSlots.size(); - } - -// @org.codehaus.jackson.annotate.JsonIgnore -// @JsonIgnore -// public void placePolicy(PolicyDefinition pd) { -// policies.add(pd.getName()); -// } - - public int getNumberOfGroupBolts() { - return numberOfGroupBolts; - } - - public void setNumberOfGroupBolts(int numberOfGroupBolts) { - this.numberOfGroupBolts = numberOfGroupBolts; - } - - public Map<String, Integer> getTopoGroupStartIndex() { - return topoGroupStartIndex; - } - - public void setTopoGroupStartIndex(Map<String, Integer> topoGroupStartIndex) { - this.topoGroupStartIndex = topoGroupStartIndex; - } - - @org.codehaus.jackson.annotate.JsonIgnore - @JsonIgnore - public int getTopologyGroupStartIndex(String topo) { - if (topoGroupStartIndex.containsKey(topo)) { - return this.topoGroupStartIndex.get(topo); - } - return -1; - } - - public String getQueueId() { - return queueId; - } - - public void setQueueId(String queueId) { - this.queueId = queueId; - } - -}