This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch 6.0 in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/6.0 by this push: new 06165a0 Feature/oap/storage (#1516) 06165a0 is described below commit 06165a0359c38c655e71ca8c29c9c06ccd4aec53 Author: 彭勇升 pengys <8082...@qq.com> AuthorDate: Fri Aug 3 12:01:53 2018 +0800 Feature/oap/storage (#1516) * Storage and Persistence. * Storage config. * Fixed the CI failure. --- .../apm/collector/storage/base/dao/IBatchDAO.java | 1 - oap-server/pom.xml | 4 +- .../oap/server/core/CoreModuleProvider.java | 4 +- .../oap/server/core/UnexpectedException.java | 13 +- .../EndpointLatencyAvgAggregateWorker.java | 10 +- .../endpoint/EndpointLatencyAvgIndicator.java | 56 +++++- .../EndpointLatencyAvgPersistentWorker.java | 4 + .../core/analysis/indicator/AvgIndicator.java | 12 +- .../server/core/analysis/indicator/Indicator.java | 14 +- .../indicator/annotation/IndicatorType.java | 2 + .../analysis/indicator/define/IndicatorMapper.java | 4 + .../analysis/worker/AbstractAggregatorWorker.java | 15 +- .../analysis/worker/AbstractPersistentWorker.java | 101 +++++++++++ .../core/analysis/worker/define/WorkerMapper.java | 6 +- .../oap/server/core/receiver/Endpoint.java | 4 + .../annotation/SourceType.java} | 6 +- .../{StorageModule.java => AbstractDAO.java} | 15 +- .../skywalking/oap/server/core/storage/DAO.java | 8 +- .../oap/server/core/storage}/IBatchDAO.java | 4 +- .../IPersistenceDAO.java} | 15 +- .../oap/server/core/storage/StorageException.java | 12 +- .../oap/server/core/storage/StorageInstaller.java | 81 +++++++++ .../oap/server/core/storage/StorageModule.java | 2 +- .../oap/server/core/storage/annotation/Column.java | 11 +- .../annotation/ColumnAnnotationRetrieval.java | 60 +++++++ .../ColumnDefine.java} | 21 ++- .../{StorageModule.java => define/ColumnName.java} | 22 ++- .../core/storage/define/ColumnTypeMapping.java | 10 +- .../server/core/storage/define/TableDefine.java | 21 ++- .../indicator/define/TestAvgIndicator.java | 19 +- .../core/storage/StorageInstallerTestCase.java | 77 ++++++++ oap-server/server-library/library-client/pom.xml | 2 +- .../client/elasticsearch/ElasticSearchClient.java | 194 +++++++++++++++++++++ .../ElasticSearchClientException.java} | 16 +- .../elasticsearch/ElasticSearchClientTestCase.java | 62 +++++++ .../oap/server/library/module/ModuleDefine.java | 8 +- .../src/main/resources/application.yml | 14 ++ .../storage-elasticsearch-plugin/pom.xml | 5 + .../StorageModuleElasticsearchConfig.java | 3 + .../StorageModuleElasticsearchProvider.java | 29 ++- .../elasticsearch/base/BatchProcessEsDAO.java | 72 ++++++++ .../elasticsearch/base/ColumnTypeEsMapping.java} | 24 ++- .../storage/plugin/elasticsearch/base/EsDAO.java | 71 ++++++++ .../elasticsearch/base/PersistenceEsDAO.java | 79 +++++++++ .../elasticsearch/base/StorageEsInstaller.java | 133 ++++++++++++++ .../ElasticSearchColumnTypeMappingTestCase.java} | 25 +-- 46 files changed, 1232 insertions(+), 139 deletions(-) diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java index 7dcf14f..2b1e335 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java @@ -16,7 +16,6 @@ * */ - package org.apache.skywalking.apm.collector.storage.base.dao; import java.util.List; diff --git a/oap-server/pom.xml b/oap-server/pom.xml index 65360c1..c34718b 100644 --- a/oap-server/pom.xml +++ b/oap-server/pom.xml @@ -55,7 +55,7 @@ <h2.version>1.4.196</h2.version> <shardingjdbc.version>2.0.3</shardingjdbc.version> <commons-dbcp.version>1.4</commons-dbcp.version> - <elasticsearch.version>6.3.1</elasticsearch.version> + <elasticsearch.version>6.3.2</elasticsearch.version> <joda-time.version>2.9.9</joda-time.version> <kubernetes.version>2.0.0</kubernetes.version> </properties> @@ -142,7 +142,7 @@ </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> - <artifactId>elasticsearch-rest-client</artifactId> + <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>${elasticsearch.version}</version> </dependency> <dependency> diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java index 757c34a..d116ae8 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java @@ -48,7 +48,7 @@ public class CoreModuleProvider extends ModuleProvider { super(); this.moduleConfig = new CoreModuleConfig(); this.indicatorMapper = new IndicatorMapper(); - this.workerMapper = new WorkerMapper(getManager()); + this.workerMapper = new WorkerMapper(); } @Override public String name() { @@ -87,7 +87,7 @@ public class CoreModuleProvider extends ModuleProvider { try { indicatorMapper.load(); - workerMapper.load(); + workerMapper.load(getManager()); } catch (IndicatorDefineLoadException | WorkerDefineLoadException e) { throw new ModuleStartException(e.getMessage(), e); } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java similarity index 79% copy from apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java index 7dcf14f..f290fd0 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java @@ -16,14 +16,13 @@ * */ - -package org.apache.skywalking.apm.collector.storage.base.dao; - -import java.util.List; +package org.apache.skywalking.oap.server.core; /** - * @author peng-yongsheng + * @author wu-sheng */ -public interface IBatchDAO extends DAO { - void batchPersistence(List<?> batchCollection); +public class UnexpectedException extends RuntimeException { + public UnexpectedException(String message) { + super(message); + } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java index 62d5134..f9177fe 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java @@ -20,23 +20,17 @@ package org.apache.skywalking.oap.server.core.analysis.endpoint; import org.apache.skywalking.oap.server.core.analysis.worker.AbstractAggregatorWorker; import org.apache.skywalking.oap.server.library.module.ModuleManager; -import org.slf4j.*; /** * @author peng-yongsheng */ public class EndpointLatencyAvgAggregateWorker extends AbstractAggregatorWorker<EndpointLatencyAvgIndicator> { - private static final Logger logger = LoggerFactory.getLogger(EndpointLatencyAvgAggregateWorker.class); - - private final EndpointLatencyAvgRemoteWorker remoter; - public EndpointLatencyAvgAggregateWorker(ModuleManager moduleManager) { super(moduleManager); - this.remoter = new EndpointLatencyAvgRemoteWorker(moduleManager); } - @Override protected void onNext(EndpointLatencyAvgIndicator data) { - remoter.in(data); + @Override public Class nextWorkerClass() { + return EndpointLatencyAvgRemoteWorker.class; } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java index e339afa..7054f3d 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java @@ -18,16 +18,33 @@ package org.apache.skywalking.oap.server.core.analysis.endpoint; +import java.util.*; import lombok.*; -import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator; +import org.apache.skywalking.oap.server.core.analysis.indicator.*; import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; +import org.apache.skywalking.oap.server.core.storage.annotation.Column; /** * @author peng-yongsheng */ public class EndpointLatencyAvgIndicator extends AvgIndicator { - @Setter @Getter private int id; + private static final String NAME = "endpoint_latency_avg"; + private static final String ID = "id"; + private static final String SERVICE_ID = "service_id"; + private static final String SERVICE_INSTANCE_ID = "service_instance_id"; + + @Setter @Getter @Column(columnName = ID) private int id; + @Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId; + @Setter @Getter @Column(columnName = SERVICE_INSTANCE_ID) private int serviceInstanceId; + + @Override public String name() { + return NAME; + } + + @Override public String id() { + return String.valueOf(id); + } @Override public int hashCode() { int result = 17; @@ -56,18 +73,49 @@ public class EndpointLatencyAvgIndicator extends AvgIndicator { @Override public RemoteData.Builder serialize() { RemoteData.Builder remoteBuilder = RemoteData.newBuilder(); remoteBuilder.setDataIntegers(0, getId()); - remoteBuilder.setDataIntegers(1, getCount()); + remoteBuilder.setDataIntegers(1, getServiceId()); + remoteBuilder.setDataIntegers(2, getServiceInstanceId()); + remoteBuilder.setDataIntegers(3, getCount()); remoteBuilder.setDataLongs(0, getTimeBucket()); remoteBuilder.setDataLongs(1, getSummation()); + remoteBuilder.setDataLongs(2, getValue()); + return remoteBuilder; } @Override public void deserialize(RemoteData remoteData) { setId(remoteData.getDataIntegers(0)); - setCount(remoteData.getDataIntegers(1)); + setServiceId(remoteData.getDataIntegers(1)); + setServiceInstanceId(remoteData.getDataIntegers(2)); + setCount(remoteData.getDataIntegers(3)); setTimeBucket(remoteData.getDataLongs(0)); setSummation(remoteData.getDataLongs(1)); + setValue(remoteData.getDataLongs(2)); + } + + @Override public Map<String, Object> toMap() { + Map<String, Object> map = new HashMap<>(); + map.put(ID, id); + map.put(SERVICE_ID, serviceId); + map.put(SERVICE_INSTANCE_ID, serviceInstanceId); + map.put(COUNT, getCount()); + map.put(SUMMATION, getSummation()); + map.put(VALUE, getValue()); + map.put(TIME_BUCKET, getTimeBucket()); + return map; + } + + @Override public Indicator newOne(Map<String, Object> dbMap) { + EndpointLatencyAvgIndicator indicator = new EndpointLatencyAvgIndicator(); + indicator.setId((Integer)dbMap.get(ID)); + indicator.setServiceId((Integer)dbMap.get(SERVICE_ID)); + indicator.setServiceInstanceId((Integer)dbMap.get(SERVICE_INSTANCE_ID)); + indicator.setCount((Integer)dbMap.get(COUNT)); + indicator.setSummation((Long)dbMap.get(SUMMATION)); + indicator.setValue((Long)dbMap.get(VALUE)); + indicator.setTimeBucket((Long)dbMap.get(TIME_BUCKET)); + return indicator; } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java index e3c5c23..288d568 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java @@ -29,4 +29,8 @@ public class EndpointLatencyAvgPersistentWorker extends AbstractPersistentWorker public EndpointLatencyAvgPersistentWorker(ModuleManager moduleManager) { super(moduleManager); } + + @Override protected boolean needMergeDBData() { + return true; + } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java index da065f3..1bae8ec 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java @@ -21,15 +21,21 @@ package org.apache.skywalking.oap.server.core.analysis.indicator; import lombok.*; import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.*; import org.apache.skywalking.oap.server.core.remote.selector.Selector; +import org.apache.skywalking.oap.server.core.storage.annotation.Column; /** * @author peng-yongsheng */ -@IndicatorType(selector = Selector.HashCode) +@IndicatorType(selector = Selector.HashCode, needMerge = true) public abstract class AvgIndicator extends Indicator { - @Getter @Setter private long summation; - @Getter @Setter private int count; + protected static final String SUMMATION = "summation"; + protected static final String COUNT = "count"; + protected static final String VALUE = "value"; + + @Getter @Setter @Column(columnName = SUMMATION) private long summation; + @Getter @Setter @Column(columnName = COUNT) private int count; + @Getter @Setter @Column(columnName = VALUE) private long value; @Entrance public final void combine(@SourceFrom long summation, @ConstOne int count) { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java index 533379e..de93255 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java @@ -18,15 +18,27 @@ package org.apache.skywalking.oap.server.core.analysis.indicator; +import java.util.Map; import lombok.*; import org.apache.skywalking.oap.server.core.analysis.data.StreamData; +import org.apache.skywalking.oap.server.core.storage.annotation.Column; /** * @author peng-yongsheng */ public abstract class Indicator extends StreamData { - @Getter @Setter private long timeBucket; + protected static final String TIME_BUCKET = "time_bucket"; + + @Getter @Setter @Column(columnName = TIME_BUCKET) private long timeBucket; + + public abstract String id(); public abstract void combine(Indicator indicator); + + public abstract String name(); + + public abstract Map<String, Object> toMap(); + + public abstract Indicator newOne(Map<String, Object> dbMap); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java index 46f0345..d1ad273 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java @@ -28,4 +28,6 @@ import org.apache.skywalking.oap.server.core.remote.selector.Selector; @Retention(RetentionPolicy.SOURCE) public @interface IndicatorType { Selector selector(); + + boolean needMerge(); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java index 8513fa2..b60e3f7 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java @@ -79,4 +79,8 @@ public class IndicatorMapper implements Service { public Class<Indicator> findClassById(int id) { return idKeyMapping.get(id); } + + public Collection<Class<Indicator>> indicatorClasses() { + return idKeyMapping.values(); + } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java index 65d68b4..1fadf90 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java @@ -21,8 +21,10 @@ package org.apache.skywalking.oap.server.core.analysis.worker; import java.util.*; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; +import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.analysis.data.*; import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; +import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.slf4j.*; @@ -33,11 +35,14 @@ public abstract class AbstractAggregatorWorker<INPUT extends Indicator> extends private static final Logger logger = LoggerFactory.getLogger(AbstractAggregatorWorker.class); + private Worker worker; + private final ModuleManager moduleManager; private final DataCarrier<INPUT> dataCarrier; private final MergeDataCache<INPUT> mergeDataCache; private int messageNum; public AbstractAggregatorWorker(ModuleManager moduleManager) { + this.moduleManager = moduleManager; this.mergeDataCache = new MergeDataCache<>(); this.dataCarrier = new DataCarrier<>(1, 10000); this.dataCarrier.consume(new AggregatorConsumer(this), 1); @@ -78,7 +83,15 @@ public abstract class AbstractAggregatorWorker<INPUT extends Indicator> extends mergeDataCache.finishReadingLast(); } - protected abstract void onNext(INPUT data); + private void onNext(INPUT data) { + if (worker == null) { + WorkerMapper workerMapper = moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class); + worker = workerMapper.findInstanceByClass(nextWorkerClass()); + } + worker.in(data); + } + + public abstract Class nextWorkerClass(); private void aggregate(INPUT message) { mergeDataCache.writing(); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java index bcead67..59f1502 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java @@ -18,18 +18,119 @@ package org.apache.skywalking.oap.server.core.analysis.worker; +import java.util.*; +import org.apache.skywalking.oap.server.core.analysis.data.*; import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; +import org.apache.skywalking.oap.server.core.storage.*; import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.slf4j.*; + +import static java.util.Objects.nonNull; /** * @author peng-yongsheng */ public abstract class AbstractPersistentWorker<INPUT extends Indicator> extends Worker<INPUT> { + private static final Logger logger = LoggerFactory.getLogger(AbstractPersistentWorker.class); + + private final MergeDataCache<INPUT> mergeDataCache; + private final IBatchDAO batchDAO; + private final IPersistenceDAO<?, ?, INPUT> persistenceDAO; + private final int blockBatchPersistenceSize = 1000; + public AbstractPersistentWorker(ModuleManager moduleManager) { + this.mergeDataCache = new MergeDataCache<>(); + this.batchDAO = moduleManager.find(StorageModule.NAME).getService(IBatchDAO.class); + this.persistenceDAO = moduleManager.find(StorageModule.NAME).getService(IPersistenceDAO.class); + } + + public final Window<MergeDataCollection<INPUT>> getCache() { + return mergeDataCache; } @Override public final void in(INPUT input) { + if (getCache().currentCollectionSize() >= blockBatchPersistenceSize) { + try { + if (getCache().trySwitchPointer()) { + getCache().switchPointer(); + + List<?> collection = buildBatchCollection(); + batchDAO.batchPersistence(collection); + } + } finally { + getCache().trySwitchPointerFinally(); + } + } + cacheData(input); + } + + public final List<?> buildBatchCollection() { + List<?> batchCollection = new LinkedList<>(); + try { + while (getCache().getLast().isWriting()) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + logger.warn("thread wake up"); + } + } + if (getCache().getLast().collection() != null) { + batchCollection = prepareBatch(getCache().getLast()); + } + } finally { + getCache().finishReadingLast(); + } + return batchCollection; } + + private List<Object> prepareBatch(MergeDataCollection<INPUT> collection) { + List<Object> batchCollection = new LinkedList<>(); + collection.collection().forEach((id, data) -> { + if (needMergeDBData()) { + INPUT dbData = null; + try { + dbData = persistenceDAO.get(data); + } catch (Throwable t) { + logger.error(t.getMessage(), t); + } + if (nonNull(dbData)) { + dbData.combine(data); + try { + batchCollection.add(persistenceDAO.prepareBatchUpdate(dbData)); + } catch (Throwable t) { + logger.error(t.getMessage(), t); + } + } else { + try { + batchCollection.add(persistenceDAO.prepareBatchInsert(data)); + } catch (Throwable t) { + logger.error(t.getMessage(), t); + } + } + } else { + try { + batchCollection.add(persistenceDAO.prepareBatchInsert(data)); + } catch (Throwable t) { + logger.error(t.getMessage(), t); + } + } + }); + + return batchCollection; + } + + private void cacheData(INPUT input) { + mergeDataCache.writing(); + if (mergeDataCache.containsKey(input)) { + mergeDataCache.get(input).combine(input); + } else { + mergeDataCache.put(input); + } + + mergeDataCache.finishWriting(); + } + + protected abstract boolean needMergeDBData(); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java index 9732430..5d81b9b 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java @@ -34,14 +34,12 @@ public class WorkerMapper implements Service { private static final Logger logger = LoggerFactory.getLogger(WorkerMapper.class); private int id = 0; - private final ModuleManager moduleManager; private final Map<Class<Worker>, Integer> classKeyMapping; private final Map<Integer, Class<Worker>> idKeyMapping; private final Map<Class<Worker>, Worker> classKeyInstanceMapping; private final Map<Integer, Worker> idKeyInstanceMapping; - public WorkerMapper(ModuleManager moduleManager) { - this.moduleManager = moduleManager; + public WorkerMapper() { this.classKeyMapping = new HashMap<>(); this.idKeyMapping = new HashMap<>(); this.classKeyInstanceMapping = new HashMap<>(); @@ -49,7 +47,7 @@ public class WorkerMapper implements Service { } @SuppressWarnings(value = "unchecked") - public void load() throws WorkerDefineLoadException { + public void load(ModuleManager moduleManager) throws WorkerDefineLoadException { try { List<String> workerClasses = new LinkedList<>(); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/Endpoint.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/Endpoint.java index 24fe14d..c6be8d2 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/Endpoint.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/Endpoint.java @@ -20,10 +20,12 @@ package org.apache.skywalking.oap.server.core.receiver; import lombok.*; import org.apache.skywalking.apm.network.language.agent.SpanLayer; +import org.apache.skywalking.oap.server.core.receiver.annotation.SourceType; /** * @author peng-yongsheng */ +@SourceType public class Endpoint extends Source { @Override public Scope scope() { return Scope.Endpoint; @@ -31,7 +33,9 @@ public class Endpoint extends Source { @Getter @Setter private int id; @Getter @Setter private String name; + @Getter @Setter private int serviceId; @Getter @Setter private String serviceName; + @Getter @Setter private int serviceInstanceId; @Getter @Setter private String serviceInstanceName; @Getter @Setter private int latency; @Getter @Setter private boolean status; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/annotation/SourceType.java similarity index 81% copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/annotation/SourceType.java index 46f0345..2065022 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/annotation/SourceType.java @@ -16,16 +16,14 @@ * */ -package org.apache.skywalking.oap.server.core.analysis.indicator.annotation; +package org.apache.skywalking.oap.server.core.receiver.annotation; import java.lang.annotation.*; -import org.apache.skywalking.oap.server.core.remote.selector.Selector; /** * @author peng-yongsheng */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.SOURCE) -public @interface IndicatorType { - Selector selector(); +public @interface SourceType { } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/AbstractDAO.java similarity index 75% copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/AbstractDAO.java index ccd7db0..db65fd7 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/AbstractDAO.java @@ -18,20 +18,19 @@ package org.apache.skywalking.oap.server.core.storage; -import org.apache.skywalking.oap.server.library.module.ModuleDefine; +import org.apache.skywalking.oap.server.library.client.Client; /** * @author peng-yongsheng */ -public class StorageModule extends ModuleDefine { +public abstract class AbstractDAO<C extends Client> implements DAO { + private final C client; - public static final String NAME = "storage"; - - @Override public String name() { - return NAME; + public AbstractDAO(C client) { + this.client = client; } - @Override public Class[] services() { - return new Class[] {}; + public final C getClient() { + return client; } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/DAO.java similarity index 82% copy from apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/DAO.java index 7dcf14f..ff1af29 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/DAO.java @@ -16,14 +16,12 @@ * */ +package org.apache.skywalking.oap.server.core.storage; -package org.apache.skywalking.apm.collector.storage.base.dao; - -import java.util.List; +import org.apache.skywalking.oap.server.library.module.Service; /** * @author peng-yongsheng */ -public interface IBatchDAO extends DAO { - void batchPersistence(List<?> batchCollection); +public interface DAO extends Service { } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java similarity index 93% copy from apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java index 7dcf14f..5c7a88a 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java @@ -16,8 +16,7 @@ * */ - -package org.apache.skywalking.apm.collector.storage.base.dao; +package org.apache.skywalking.oap.server.core.storage; import java.util.List; @@ -25,5 +24,6 @@ import java.util.List; * @author peng-yongsheng */ public interface IBatchDAO extends DAO { + void batchPersistence(List<?> batchCollection); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IPersistenceDAO.java similarity index 70% copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IPersistenceDAO.java index bcead67..2a9d1f9 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IPersistenceDAO.java @@ -16,20 +16,21 @@ * */ -package org.apache.skywalking.oap.server.core.analysis.worker; +package org.apache.skywalking.oap.server.core.storage; +import java.io.IOException; import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; -import org.apache.skywalking.oap.server.library.module.ModuleManager; /** * @author peng-yongsheng */ -public abstract class AbstractPersistentWorker<INPUT extends Indicator> extends Worker<INPUT> { +public interface IPersistenceDAO<INSERT, UPDATE, INPUT extends Indicator> extends DAO { - public AbstractPersistentWorker(ModuleManager moduleManager) { - } + INPUT get(INPUT input) throws IOException; - @Override public final void in(INPUT input) { + INSERT prepareBatchInsert(INPUT input) throws IOException; - } + UPDATE prepareBatchUpdate(INPUT input) throws IOException; + + void deleteHistory(Long timeBucketBefore); } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageException.java similarity index 82% copy from apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageException.java index 7dcf14f..b9a4d94 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageException.java @@ -16,14 +16,14 @@ * */ - -package org.apache.skywalking.apm.collector.storage.base.dao; - -import java.util.List; +package org.apache.skywalking.oap.server.core.storage; /** * @author peng-yongsheng */ -public interface IBatchDAO extends DAO { - void batchPersistence(List<?> batchCollection); +public class StorageException extends Exception { + + public StorageException(String message) { + super(message); + } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageInstaller.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageInstaller.java new file mode 100644 index 0000000..9e4545c --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageInstaller.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.storage; + +import java.util.*; +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; +import org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper; +import org.apache.skywalking.oap.server.core.storage.annotation.ColumnAnnotationRetrieval; +import org.apache.skywalking.oap.server.core.storage.define.*; +import org.apache.skywalking.oap.server.library.client.Client; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.slf4j.*; + +/** + * @author peng-yongsheng + */ +public abstract class StorageInstaller { + + private static final Logger logger = LoggerFactory.getLogger(StorageInstaller.class); + + private final ModuleManager moduleManager; + private final ColumnAnnotationRetrieval annotationRetrieval; + + public StorageInstaller(ModuleManager moduleManager) { + this.moduleManager = moduleManager; + this.annotationRetrieval = new ColumnAnnotationRetrieval(); + } + + public final void install(Client client) throws StorageException { + IndicatorMapper indicatorMapper = moduleManager.find(CoreModule.NAME).getService(IndicatorMapper.class); + Collection<Class<Indicator>> indicatorClasses = indicatorMapper.indicatorClasses(); + + Boolean debug = System.getProperty("debug") != null; + for (Class<Indicator> indicatorClass : indicatorClasses) { + List<ColumnDefine> columnDefines = annotationRetrieval.retrieval(indicatorClass); + + String tableName; + try { + tableName = indicatorClass.newInstance().name(); + } catch (InstantiationException | IllegalAccessException e) { + throw new StorageException(e.getMessage()); + } + TableDefine tableDefine = new TableDefine(tableName, columnDefines); + + if (!isExists(client, tableDefine)) { + logger.info("table: {} not exists", tableDefine.getName()); + createTable(client, tableDefine); + } else if (debug) { + logger.info("table: {} exists", tableDefine.getName()); + deleteTable(client, tableDefine); + createTable(client, tableDefine); + } + columnCheck(client, tableDefine); + } + } + + protected abstract boolean isExists(Client client, TableDefine tableDefine) throws StorageException; + + protected abstract void columnCheck(Client client, TableDefine tableDefine) throws StorageException; + + protected abstract void deleteTable(Client client, TableDefine tableDefine) throws StorageException; + + protected abstract void createTable(Client client, TableDefine tableDefine) throws StorageException; +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java index ccd7db0..559eb5c 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java @@ -32,6 +32,6 @@ public class StorageModule extends ModuleDefine { } @Override public Class[] services() { - return new Class[] {}; + return new Class[] {IBatchDAO.class, IPersistenceDAO.class}; } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java similarity index 79% copy from apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java index 7dcf14f..aa6828b 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java @@ -16,14 +16,15 @@ * */ +package org.apache.skywalking.oap.server.core.storage.annotation; -package org.apache.skywalking.apm.collector.storage.base.dao; - -import java.util.List; +import java.lang.annotation.*; /** * @author peng-yongsheng */ -public interface IBatchDAO extends DAO { - void batchPersistence(List<?> batchCollection); +@Target(ElementType.FIELD) +@Retention(RetentionPolicy.RUNTIME) +public @interface Column { + String columnName(); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ColumnAnnotationRetrieval.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ColumnAnnotationRetrieval.java new file mode 100644 index 0000000..cf6a3dc --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ColumnAnnotationRetrieval.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.storage.annotation; + +import java.lang.reflect.Field; +import java.util.*; +import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; +import org.apache.skywalking.oap.server.core.storage.define.*; +import org.slf4j.*; + +/** + * @author peng-yongsheng + */ +public class ColumnAnnotationRetrieval { + + private static final Logger logger = LoggerFactory.getLogger(ColumnAnnotationRetrieval.class); + + public List<ColumnDefine> retrieval(Class<Indicator> indicatorClass) { + if (logger.isDebugEnabled()) { + logger.debug("Retrieval column annotation from class {}", indicatorClass.getName()); + } + List<ColumnDefine> columnDefines = new LinkedList<>(); + retrieval(indicatorClass, columnDefines); + return columnDefines; + } + + private void retrieval(Class clazz, List<ColumnDefine> columnDefines) { + Field[] fields = clazz.getDeclaredFields(); + + for (Field field : fields) { + if (field.isAnnotationPresent(Column.class)) { + Column column = field.getAnnotation(Column.class); + columnDefines.add(new ColumnDefine(new ColumnName(column.columnName(), column.columnName()), field.getType())); + if (logger.isDebugEnabled()) { + logger.debug("The field named {} with the {} type", column.columnName(), field.getType()); + } + } + } + + if (Objects.nonNull(clazz.getSuperclass())) { + retrieval(clazz.getSuperclass(), columnDefines); + } + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnDefine.java similarity index 66% copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnDefine.java index ccd7db0..9bf4a87 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnDefine.java @@ -16,22 +16,25 @@ * */ -package org.apache.skywalking.oap.server.core.storage; - -import org.apache.skywalking.oap.server.library.module.ModuleDefine; +package org.apache.skywalking.oap.server.core.storage.define; /** * @author peng-yongsheng */ -public class StorageModule extends ModuleDefine { +public class ColumnDefine { + private final ColumnName columnName; + private final Class<?> type; - public static final String NAME = "storage"; + public ColumnDefine(ColumnName columnName, Class<?> type) { + this.columnName = columnName; + this.type = type; + } - @Override public String name() { - return NAME; + public final ColumnName getColumnName() { + return columnName; } - @Override public Class[] services() { - return new Class[] {}; + public final Class<?> getType() { + return type; } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnName.java similarity index 63% copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnName.java index ccd7db0..8dc8882 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnName.java @@ -16,22 +16,26 @@ * */ -package org.apache.skywalking.oap.server.core.storage; - -import org.apache.skywalking.oap.server.library.module.ModuleDefine; +package org.apache.skywalking.oap.server.core.storage.define; /** * @author peng-yongsheng */ -public class StorageModule extends ModuleDefine { +public class ColumnName { + private final String fullName; + private final String shortName; + private boolean useShortName = false; - public static final String NAME = "storage"; + public ColumnName(String fullName, String shortName) { + this.fullName = fullName; + this.shortName = shortName; + } - @Override public String name() { - return NAME; + public String getName() { + return useShortName ? shortName : fullName; } - @Override public Class[] services() { - return new Class[] {}; + public void useShortName() { + this.useShortName = true; } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnTypeMapping.java similarity index 82% copy from apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnTypeMapping.java index 7dcf14f..3a6c3a8 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnTypeMapping.java @@ -16,14 +16,12 @@ * */ - -package org.apache.skywalking.apm.collector.storage.base.dao; - -import java.util.List; +package org.apache.skywalking.oap.server.core.storage.define; /** * @author peng-yongsheng */ -public interface IBatchDAO extends DAO { - void batchPersistence(List<?> batchCollection); +public interface ColumnTypeMapping { + + String transform(Class<?> type); } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/TableDefine.java similarity index 64% copy from apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/TableDefine.java index 7dcf14f..e0f59ff 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/TableDefine.java @@ -16,14 +16,27 @@ * */ - -package org.apache.skywalking.apm.collector.storage.base.dao; +package org.apache.skywalking.oap.server.core.storage.define; import java.util.List; /** * @author peng-yongsheng */ -public interface IBatchDAO extends DAO { - void batchPersistence(List<?> batchCollection); +public class TableDefine { + private final String name; + private final List<ColumnDefine> columnDefines; + + public TableDefine(String name, List<ColumnDefine> columnDefines) { + this.name = name; + this.columnDefines = columnDefines; + } + + public final String getName() { + return name; + } + + public final List<ColumnDefine> getColumnDefines() { + return columnDefines; + } } diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java index 17d1189..9865ff3 100644 --- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java +++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java @@ -18,8 +18,9 @@ package org.apache.skywalking.oap.server.core.analysis.indicator.define; +import java.util.Map; import lombok.*; -import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator; +import org.apache.skywalking.oap.server.core.analysis.indicator.*; import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; /** @@ -35,4 +36,20 @@ public class TestAvgIndicator extends AvgIndicator { @Override public void deserialize(RemoteData remoteData) { } + + @Override public String id() { + return null; + } + + @Override public String name() { + return null; + } + + @Override public Map<String, Object> toMap() { + return null; + } + + @Override public Indicator newOne(Map<String, Object> dbMap) { + return null; + } } diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java new file mode 100644 index 0000000..533c1be --- /dev/null +++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.storage; + +import java.util.LinkedList; +import org.apache.skywalking.oap.server.core.*; +import org.apache.skywalking.oap.server.core.analysis.indicator.define.*; +import org.apache.skywalking.oap.server.core.storage.define.TableDefine; +import org.apache.skywalking.oap.server.library.client.Client; +import org.apache.skywalking.oap.server.library.module.*; +import org.junit.Test; +import org.mockito.Mockito; +import org.powermock.reflect.Whitebox; + +/** + * @author peng-yongsheng + */ +public class StorageInstallerTestCase { + + @Test + public void testInstall() throws StorageException, DuplicateProviderException, ServiceNotProvidedException, IndicatorDefineLoadException { + IndicatorMapper indicatorMapper = new IndicatorMapper(); + CoreModuleProvider moduleProvider = Mockito.mock(CoreModuleProvider.class); + CoreModule moduleDefine = Mockito.spy(CoreModule.class); + ModuleManager moduleManager = Mockito.mock(ModuleManager.class); + + LinkedList<ModuleProvider> moduleProviders = Whitebox.getInternalState(moduleDefine, "loadedProviders"); + moduleProviders.add(moduleProvider); + + Mockito.when(moduleManager.find(CoreModule.NAME)).thenReturn(moduleDefine); + Mockito.when(moduleProvider.getService(IndicatorMapper.class)).thenReturn(indicatorMapper); + + indicatorMapper.load(); + + TestStorageInstaller installer = new TestStorageInstaller(moduleManager); + installer.install(null); + } + + class TestStorageInstaller extends StorageInstaller { + + public TestStorageInstaller(ModuleManager moduleManager) { + super(moduleManager); + } + + @Override protected boolean isExists(Client client, TableDefine tableDefine) throws StorageException { + return false; + } + + @Override protected void columnCheck(Client client, TableDefine tableDefine) throws StorageException { + + } + + @Override protected void deleteTable(Client client, TableDefine tableDefine) throws StorageException { + + } + + @Override protected void createTable(Client client, TableDefine tableDefine) throws StorageException { + + } + } +} diff --git a/oap-server/server-library/library-client/pom.xml b/oap-server/server-library/library-client/pom.xml index 7ceece8..98fa2de 100644 --- a/oap-server/server-library/library-client/pom.xml +++ b/oap-server/server-library/library-client/pom.xml @@ -49,7 +49,7 @@ </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> - <artifactId>elasticsearch-rest-client</artifactId> + <artifactId>elasticsearch-rest-high-level-client</artifactId> </dependency> </dependencies> </project> \ No newline at end of file diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java new file mode 100644 index 0000000..520e3f0 --- /dev/null +++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.library.client.elasticsearch; + +import java.io.IOException; +import java.util.*; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.*; +import org.apache.http.entity.ContentType; +import org.apache.http.nio.entity.NStringEntity; +import org.apache.skywalking.oap.server.library.client.Client; +import org.apache.skywalking.oap.server.library.client.*; +import org.elasticsearch.action.admin.indices.create.*; +import org.elasticsearch.action.admin.indices.delete.*; +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.action.bulk.*; +import org.elasticsearch.action.get.*; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.*; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.*; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.*; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.slf4j.*; + +/** + * @author peng-yongsheng + */ +public class ElasticSearchClient implements Client { + + private static final Logger logger = LoggerFactory.getLogger(ElasticSearchClient.class); + + private static final String TYPE = "type"; + private final String clusterNodes; + private final NameSpace namespace; + private RestHighLevelClient client; + + public ElasticSearchClient(String clusterNodes, NameSpace namespace) { + this.clusterNodes = clusterNodes; + this.namespace = namespace; + } + + @Override public void initialize() { + List<HttpHost> pairsList = parseClusterNodes(clusterNodes); + + client = new RestHighLevelClient( + RestClient.builder(pairsList.toArray(new HttpHost[0]))); + } + + @Override public void shutdown() { + try { + client.close(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } + + private List<HttpHost> parseClusterNodes(String nodes) { + List<HttpHost> httpHosts = new LinkedList<>(); + logger.info("elasticsearch cluster nodes: {}", nodes); + String[] nodesSplit = nodes.split(","); + for (String node : nodesSplit) { + String host = node.split(":")[0]; + String port = node.split(":")[1]; + httpHosts.add(new HttpHost(host, Integer.valueOf(port))); + } + + return httpHosts; + } + + public boolean createIndex(String indexName, Settings settings, + XContentBuilder mappingBuilder) throws IOException { + indexName = formatIndexName(indexName); + CreateIndexRequest request = new CreateIndexRequest(indexName); + request.settings(settings); + request.mapping(TYPE, mappingBuilder); + CreateIndexResponse response; + response = client.indices().create(request); + logger.info("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged()); + return response.isAcknowledged(); + } + + public boolean deleteIndex(String indexName) throws IOException { + indexName = formatIndexName(indexName); + DeleteIndexRequest request = new DeleteIndexRequest(indexName); + DeleteIndexResponse response; + response = client.indices().delete(request); + logger.info("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged()); + return response.isAcknowledged(); + } + + public boolean isExistsIndex(String indexName) throws IOException { + indexName = formatIndexName(indexName); + GetIndexRequest request = new GetIndexRequest(); + request.indices(indexName); + return client.indices().exists(request); + } + + public SearchResponse search(String indexName, SearchSourceBuilder searchSourceBuilder) throws IOException { + indexName = formatIndexName(indexName); + SearchRequest searchRequest = new SearchRequest(indexName); + searchRequest.types(TYPE); + searchRequest.source(searchSourceBuilder); + return client.search(searchRequest); + } + + public GetResponse get(String indexName, String id) throws IOException { + indexName = formatIndexName(indexName); + GetRequest request = new GetRequest(indexName, TYPE, id); + return client.get(request); + } + + public IndexRequest prepareInsert(String indexName, String id, XContentBuilder source) { + indexName = formatIndexName(indexName); + return new IndexRequest(indexName, TYPE, id).source(source); + } + + public UpdateRequest prepareUpdate(String indexName, String id, XContentBuilder source) { + indexName = formatIndexName(indexName); + return new UpdateRequest(indexName, TYPE, id).doc(source); + } + + public void delete(String indexName, String timeBucketColumnName, long startTimeBucket, + long endTimeBucket) throws IOException { + indexName = formatIndexName(indexName); + Map<String, String> params = Collections.singletonMap("pretty", "true"); + String jsonString = "{" + + " \"query\": {" + + " \"range\": {" + + " \"" + timeBucketColumnName + "\": {" + + " \"gte\": " + startTimeBucket + "," + + " \"lte\": " + endTimeBucket + "" + + " }" + + " }" + + " }" + + "}"; + HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON); + client.getLowLevelClient().performRequest("POST", "/" + indexName + "/_delete_by_query", params, entity); + } + + private String formatIndexName(String indexName) { + if (Objects.nonNull(namespace) && StringUtils.isNotEmpty(namespace.getNameSpace())) { + return namespace.getNameSpace() + "_" + indexName; + } + return indexName; + } + + public BulkProcessor createBulkProcessor(int bulkActions, int bulkSize, int flushInterval, + int concurrentRequests) { + BulkProcessor.Listener listener = new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) { + + } + + @Override + public void afterBulk(long executionId, BulkRequest request, + BulkResponse response) { + + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + logger.error("{} data bulk failed, reason: {}", request.numberOfActions(), failure); + } + }; + + return BulkProcessor.builder(client::bulkAsync, listener) + .setBulkActions(bulkActions) + .setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB)) + .setFlushInterval(TimeValue.timeValueSeconds(flushInterval)) + .setConcurrentRequests(concurrentRequests) + .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) + .build(); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientException.java similarity index 68% copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java copy to oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientException.java index ccd7db0..bb71d07 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java +++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientException.java @@ -16,22 +16,20 @@ * */ -package org.apache.skywalking.oap.server.core.storage; +package org.apache.skywalking.oap.server.library.client.elasticsearch; -import org.apache.skywalking.oap.server.library.module.ModuleDefine; +import org.apache.skywalking.oap.server.library.client.ClientException; /** * @author peng-yongsheng */ -public class StorageModule extends ModuleDefine { +public class ElasticSearchClientException extends ClientException { - public static final String NAME = "storage"; - - @Override public String name() { - return NAME; + public ElasticSearchClientException(String message) { + super(message); } - @Override public Class[] services() { - return new Class[] {}; + public ElasticSearchClientException(String message, Throwable cause) { + super(message, cause); } } diff --git a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java new file mode 100644 index 0000000..f696202 --- /dev/null +++ b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.library.client.elasticsearch; + +import java.io.IOException; +import org.apache.skywalking.oap.server.library.client.ClientException; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.*; +import org.junit.Assert; + +/** + * @author peng-yongsheng + */ +public class ElasticSearchClientTestCase { + + public static void main(String[] args) throws IOException, ClientException { + Settings settings = Settings.builder() + .put("number_of_shards", 2) + .put("number_of_replicas", 0) + .build(); + + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject() + .startObject("_all") + .field("enabled", false) + .endObject() + .startObject("properties") + .startObject("column1") + .field("type", "text") + .endObject() + .endObject(); + builder.endObject(); + + ElasticSearchClient client = new ElasticSearchClient("localhost:9200", null); + client.initialize(); + + String indexName = "test"; + client.createIndex(indexName, settings, builder); + Assert.assertTrue(client.isExistsIndex(indexName)); + client.deleteIndex(indexName); + Assert.assertFalse(client.isExistsIndex(indexName)); + + + client.shutdown(); + } +} diff --git a/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleDefine.java b/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleDefine.java index 74c908d..5278215 100644 --- a/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleDefine.java +++ b/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleDefine.java @@ -18,11 +18,9 @@ package org.apache.skywalking.oap.server.library.module; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.lang.reflect.Field; import java.util.*; +import org.slf4j.*; /** * A module definition. @@ -31,7 +29,7 @@ import java.util.*; */ public abstract class ModuleDefine { - private final Logger logger = LoggerFactory.getLogger(ModuleDefine.class); + private static final Logger logger = LoggerFactory.getLogger(ModuleDefine.class); private LinkedList<ModuleProvider> loadedProviders = new LinkedList<>(); @@ -128,7 +126,7 @@ public abstract class ModuleDefine { return loadedProviders; } - final ModuleProvider provider() throws DuplicateProviderException { + public final ModuleProvider provider() throws DuplicateProviderException { if (loadedProviders.size() > 1) { throw new DuplicateProviderException(this.name() + " module exist " + loadedProviders.size() + " providers"); } diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml index cabf8a2..46c3535 100644 --- a/oap-server/server-starter/src/main/resources/application.yml +++ b/oap-server/server-starter/src/main/resources/application.yml @@ -35,6 +35,20 @@ core: gRPCPort: 11800 storage: elasticsearch: + clusterNodes: localhost:9200 + indexShardsNumber: 2 + indexReplicasNumber: 0 + # Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html + bulkActions: 2000 # Execute the bulk every 2000 requests + bulkSize: 20 # flush the bulk every 20mb + flushInterval: 10 # flush the bulk every 10 seconds whatever the number of requests + concurrentRequests: 2 # the number of concurrent requests + # Set a timeout on metric data. After the timeout has expired, the metric data will automatically be deleted. + traceDataTTL: 90 # Unit is minute + minuteMetricDataTTL: 90 # Unit is minute + hourMetricDataTTL: 36 # Unit is hour + dayMetricDataTTL: 45 # Unit is day + monthMetricDataTTL: 18 # Unit is month service-mesh: default: query: diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/pom.xml b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/pom.xml index e0adc43..6c93cb6 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/pom.xml +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/pom.xml @@ -36,5 +36,10 @@ <artifactId>server-core</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.skywalking</groupId> + <artifactId>library-client</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> </project> \ No newline at end of file diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java index 92768ad..09b0bb2 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java @@ -18,6 +18,7 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch; +import lombok.*; import org.apache.skywalking.oap.server.library.module.ModuleConfig; /** @@ -25,6 +26,8 @@ import org.apache.skywalking.oap.server.library.module.ModuleConfig; */ public class StorageModuleElasticsearchConfig extends ModuleConfig { + @Setter @Getter private String nameSpace; + @Setter @Getter private String clusterNodes; private int indexShardsNumber; private int indexReplicasNumber; private boolean highPerformanceMode; diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java index 23367a5..b4aa7e8 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java @@ -18,8 +18,11 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch; -import org.apache.skywalking.oap.server.core.storage.StorageModule; +import org.apache.skywalking.oap.server.core.storage.*; +import org.apache.skywalking.oap.server.library.client.NameSpace; +import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.oap.server.library.module.*; +import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.*; import org.slf4j.*; /** @@ -29,11 +32,14 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider { private static final Logger logger = LoggerFactory.getLogger(StorageModuleElasticsearchProvider.class); - private final StorageModuleElasticsearchConfig storageConfig; + private final StorageModuleElasticsearchConfig config; + private final NameSpace nameSpace; + private ElasticSearchClient elasticSearchClient; public StorageModuleElasticsearchProvider() { super(); - this.storageConfig = new StorageModuleElasticsearchConfig(); + this.config = new StorageModuleElasticsearchConfig(); + this.nameSpace = new NameSpace(); } @Override @@ -42,21 +48,34 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider { } @Override - public Class module() { + public Class<? extends ModuleDefine> module() { return StorageModule.class; } @Override public ModuleConfig createConfigBeanIfAbsent() { - return storageConfig; + return config; } @Override public void prepare() throws ServiceNotProvidedException { + elasticSearchClient = new ElasticSearchClient(config.getClusterNodes(), nameSpace); + + this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getBulkSize(), config.getFlushInterval(), config.getConcurrentRequests())); + this.registerServiceImplementation(IPersistenceDAO.class, new PersistenceEsDAO(elasticSearchClient, nameSpace)); } @Override public void start() throws ModuleStartException { + try { + nameSpace.setNameSpace(config.getNameSpace()); + elasticSearchClient.initialize(); + + StorageEsInstaller installer = new StorageEsInstaller(getManager(), config.getIndexShardsNumber(), config.getIndexReplicasNumber()); + installer.install(elasticSearchClient); + } catch (StorageException e) { + throw new ModuleStartException(e.getMessage(), e); + } } @Override diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java new file mode 100644 index 0000000..0e9ff2f --- /dev/null +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base; + +import java.util.List; +import org.apache.skywalking.oap.server.core.storage.IBatchDAO; +import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; +import org.apache.skywalking.oap.server.library.util.CollectionUtils; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.slf4j.*; + +/** + * @author peng-yongsheng + */ +public class BatchProcessEsDAO extends EsDAO implements IBatchDAO { + + private static final Logger logger = LoggerFactory.getLogger(BatchProcessEsDAO.class); + + private BulkProcessor bulkProcessor; + private final int bulkActions; + private final int bulkSize; + private final int flushInterval; + private final int concurrentRequests; + + public BatchProcessEsDAO(ElasticSearchClient client, int bulkActions, int bulkSize, int flushInterval, + int concurrentRequests) { + super(client); + this.bulkActions = bulkActions; + this.bulkSize = bulkSize; + this.flushInterval = flushInterval; + this.concurrentRequests = concurrentRequests; + } + + @Override public void batchPersistence(List<?> batchCollection) { + if (bulkProcessor == null) { + this.bulkProcessor = getClient().createBulkProcessor(bulkActions, bulkSize, flushInterval, concurrentRequests); + } + + if (logger.isDebugEnabled()) { + logger.debug("bulk data size: {}", batchCollection.size()); + } + + if (CollectionUtils.isNotEmpty(batchCollection)) { + batchCollection.forEach(builder -> { + if (builder instanceof IndexRequest) { + this.bulkProcessor.add((IndexRequest)builder); + } + if (builder instanceof UpdateRequest) { + this.bulkProcessor.add((UpdateRequest)builder); + } + }); + } + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java similarity index 52% copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java copy to oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java index bcead67..8e268c1 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java @@ -16,20 +16,26 @@ * */ -package org.apache.skywalking.oap.server.core.analysis.worker; +package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base; -import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; -import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.core.storage.define.ColumnTypeMapping; /** * @author peng-yongsheng */ -public abstract class AbstractPersistentWorker<INPUT extends Indicator> extends Worker<INPUT> { - - public AbstractPersistentWorker(ModuleManager moduleManager) { - } - - @Override public final void in(INPUT input) { +public class ColumnTypeEsMapping implements ColumnTypeMapping { + @Override public String transform(Class<?> type) { + if (Integer.class.equals(type) || int.class.equals(type)) { + return "integer"; + } else if (Long.class.equals(type) || long.class.equals(type)) { + return "long"; + } else if (Double.class.equals(type) || double.class.equals(type)) { + return "double"; + } else if (String.class.equals(type)) { + return "text"; + } else { + throw new IllegalArgumentException("Unsupported data type: " + type.getName()); + } } } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java new file mode 100644 index 0000000..dd0a70b --- /dev/null +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base; + +import java.io.IOException; +import org.apache.skywalking.oap.server.core.storage.AbstractDAO; +import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.metrics.max.Max; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.slf4j.*; + +/** + * @author peng-yongsheng + */ +public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> { + + private static final Logger logger = LoggerFactory.getLogger(EsDAO.class); + + public EsDAO(ElasticSearchClient client) { + super(client); + } + + protected final int getMaxId(String indexName, String columnName) { + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.aggregation(AggregationBuilders.max("agg").field(columnName)); + searchSourceBuilder.size(0); + return getResponse(indexName, searchSourceBuilder); + } + + protected final int getMinId(String indexName, String columnName) { + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.aggregation(AggregationBuilders.min("agg").field(columnName)); + searchSourceBuilder.size(0); + return getResponse(indexName, searchSourceBuilder); + } + + private int getResponse(String indexName, SearchSourceBuilder searchSourceBuilder) { + try { + SearchResponse searchResponse = getClient().search(indexName, searchSourceBuilder); + Max agg = searchResponse.getAggregations().get("agg"); + + int id = (int)agg.getValue(); + if (id == Integer.MAX_VALUE || id == Integer.MIN_VALUE) { + return 0; + } else { + return id; + } + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + return 0; + } +} diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/PersistenceEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/PersistenceEsDAO.java new file mode 100644 index 0000000..1f83b63 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/PersistenceEsDAO.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base; + +import java.io.IOException; +import java.util.Map; +import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; +import org.apache.skywalking.oap.server.core.storage.IPersistenceDAO; +import org.apache.skywalking.oap.server.library.client.NameSpace; +import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.common.xcontent.*; + +/** + * @author peng-yongsheng + */ +public class PersistenceEsDAO implements IPersistenceDAO<IndexRequest, UpdateRequest, Indicator> { + + private final ElasticSearchClient client; + private final NameSpace nameSpace; + + public PersistenceEsDAO(ElasticSearchClient client, NameSpace nameSpace) { + this.client = client; + this.nameSpace = nameSpace; + } + + @Override public Indicator get(Indicator input) throws IOException { + GetResponse response = client.get(nameSpace.getNameSpace() + "_" + input.name(), input.id()); + if (response.isExists()) { + return input.newOne(response.getSource()); + } else { + return null; + } + } + + @Override public IndexRequest prepareBatchInsert(Indicator input) throws IOException { + Map<String, Object> objectMap = input.toMap(); + + XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); + for (String key : objectMap.keySet()) { + builder.field(key, objectMap.get(key)); + } + builder.endObject(); + return client.prepareInsert(nameSpace.getNameSpace() + "_" + input.name(), input.id(), builder); + } + + @Override public UpdateRequest prepareBatchUpdate(Indicator input) throws IOException { + Map<String, Object> objectMap = input.toMap(); + + XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); + for (String key : objectMap.keySet()) { + builder.field(key, objectMap.get(key)); + } + builder.endObject(); + return client.prepareUpdate(nameSpace.getNameSpace() + "_" + input.name(), input.id(), builder); + } + + @Override public void deleteHistory(Long timeBucketBefore) { + + } +} diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java new file mode 100644 index 0000000..07868d8 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base; + +import java.io.IOException; +import org.apache.skywalking.oap.server.core.storage.*; +import org.apache.skywalking.oap.server.core.storage.define.*; +import org.apache.skywalking.oap.server.library.client.Client; +import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.*; +import org.slf4j.*; + +/** + * @author peng-yongsheng + */ +public class StorageEsInstaller extends StorageInstaller { + + private static final Logger logger = LoggerFactory.getLogger(StorageEsInstaller.class); + + private final int indexShardsNumber; + private final int indexReplicasNumber; + private final ColumnTypeEsMapping mapping; + + public StorageEsInstaller(ModuleManager moduleManager, int indexShardsNumber, int indexReplicasNumber) { + super(moduleManager); + this.indexShardsNumber = indexShardsNumber; + this.indexReplicasNumber = indexReplicasNumber; + this.mapping = new ColumnTypeEsMapping(); + } + + @Override protected boolean isExists(Client client, TableDefine tableDefine) throws StorageException { + ElasticSearchClient esClient = (ElasticSearchClient)client; + try { + return esClient.isExistsIndex(tableDefine.getName()); + } catch (IOException e) { + throw new StorageException(e.getMessage()); + } + } + + @Override protected void columnCheck(Client client, TableDefine tableDefine) throws StorageException { + + } + + @Override protected void deleteTable(Client client, TableDefine tableDefine) throws StorageException { + ElasticSearchClient esClient = (ElasticSearchClient)client; + + try { + if (!esClient.deleteIndex(tableDefine.getName())) { + throw new StorageException(tableDefine.getName() + " index delete failure."); + } + } catch (IOException e) { + throw new StorageException(tableDefine.getName() + " index delete failure."); + } + } + + @Override protected void createTable(Client client, TableDefine tableDefine) throws StorageException { + ElasticSearchClient esClient = (ElasticSearchClient)client; + + // mapping + XContentBuilder mappingBuilder = null; + + Settings settings = createSettingBuilder(); + try { + mappingBuilder = createMappingBuilder(tableDefine); + logger.info("mapping builder str: {}", mappingBuilder.prettyPrint()); + } catch (Exception e) { + logger.error("create {} index mapping builder error", tableDefine.getName()); + } + + boolean isAcknowledged; + try { + isAcknowledged = esClient.createIndex(tableDefine.getName(), settings, mappingBuilder); + } catch (IOException e) { + throw new StorageException(e.getMessage()); + } + logger.info("create {} index finished, isAcknowledged: {}", tableDefine.getName(), isAcknowledged); + + if (!isAcknowledged) { + throw new StorageException("create " + tableDefine.getName() + " index failure, "); + } + } + + private Settings createSettingBuilder() { + return Settings.builder() + .put("index.number_of_shards", indexShardsNumber) + .put("index.number_of_replicas", indexReplicasNumber) + .put("index.refresh_interval", "3s") + .put("analysis.analyzer.collector_analyzer.type", "stop") + .build(); + } + + private XContentBuilder createMappingBuilder(TableDefine tableDefine) throws IOException { + XContentBuilder mappingBuilder = XContentFactory.jsonBuilder() + .startObject() + .startObject("_all") + .field("enabled", false) + .endObject() + .startObject("properties"); + + for (ColumnDefine columnDefine : tableDefine.getColumnDefines()) { + mappingBuilder + .startObject(columnDefine.getColumnName().getName()) + .field("type", mapping.transform(columnDefine.getType())) + .endObject(); + } + + mappingBuilder + .endObject() + .endObject(); + + logger.debug("create elasticsearch index: {}", mappingBuilder.prettyPrint()); + + return mappingBuilder; + } +} diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java similarity index 53% copy from oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java copy to oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java index 17d1189..2553ab7 100644 --- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java @@ -16,23 +16,28 @@ * */ -package org.apache.skywalking.oap.server.core.analysis.indicator.define; +package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base; -import lombok.*; -import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator; -import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; +import org.junit.*; /** * @author peng-yongsheng */ -public class TestAvgIndicator extends AvgIndicator { +public class ElasticSearchColumnTypeMappingTestCase { - @Setter @Getter private int id; + @Test + public void test() { + ColumnTypeEsMapping mapping = new ColumnTypeEsMapping(); - @Override public RemoteData.Builder serialize() { - return null; - } + Assert.assertEquals("integer", mapping.transform(int.class)); + Assert.assertEquals("integer", mapping.transform(Integer.class)); + + Assert.assertEquals("long", mapping.transform(long.class)); + Assert.assertEquals("long", mapping.transform(Long.class)); + + Assert.assertEquals("double", mapping.transform(double.class)); + Assert.assertEquals("double", mapping.transform(Double.class)); - @Override public void deserialize(RemoteData remoteData) { + Assert.assertEquals("text", mapping.transform(String.class)); } }