This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch 6.0
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/6.0 by this push:
     new 06165a0  Feature/oap/storage (#1516)
06165a0 is described below

commit 06165a0359c38c655e71ca8c29c9c06ccd4aec53
Author: 彭勇升 pengys <8082...@qq.com>
AuthorDate: Fri Aug 3 12:01:53 2018 +0800

    Feature/oap/storage (#1516)
    
    * Storage and Persistence.
    
    * Storage config.
    
    * Fixed the CI failure.
---
 .../apm/collector/storage/base/dao/IBatchDAO.java  |   1 -
 oap-server/pom.xml                                 |   4 +-
 .../oap/server/core/CoreModuleProvider.java        |   4 +-
 .../oap/server/core/UnexpectedException.java       |  13 +-
 .../EndpointLatencyAvgAggregateWorker.java         |  10 +-
 .../endpoint/EndpointLatencyAvgIndicator.java      |  56 +++++-
 .../EndpointLatencyAvgPersistentWorker.java        |   4 +
 .../core/analysis/indicator/AvgIndicator.java      |  12 +-
 .../server/core/analysis/indicator/Indicator.java  |  14 +-
 .../indicator/annotation/IndicatorType.java        |   2 +
 .../analysis/indicator/define/IndicatorMapper.java |   4 +
 .../analysis/worker/AbstractAggregatorWorker.java  |  15 +-
 .../analysis/worker/AbstractPersistentWorker.java  | 101 +++++++++++
 .../core/analysis/worker/define/WorkerMapper.java  |   6 +-
 .../oap/server/core/receiver/Endpoint.java         |   4 +
 .../annotation/SourceType.java}                    |   6 +-
 .../{StorageModule.java => AbstractDAO.java}       |  15 +-
 .../skywalking/oap/server/core/storage/DAO.java    |   8 +-
 .../oap/server/core/storage}/IBatchDAO.java        |   4 +-
 .../IPersistenceDAO.java}                          |  15 +-
 .../oap/server/core/storage/StorageException.java  |  12 +-
 .../oap/server/core/storage/StorageInstaller.java  |  81 +++++++++
 .../oap/server/core/storage/StorageModule.java     |   2 +-
 .../oap/server/core/storage/annotation/Column.java |  11 +-
 .../annotation/ColumnAnnotationRetrieval.java      |  60 +++++++
 .../ColumnDefine.java}                             |  21 ++-
 .../{StorageModule.java => define/ColumnName.java} |  22 ++-
 .../core/storage/define/ColumnTypeMapping.java     |  10 +-
 .../server/core/storage/define/TableDefine.java    |  21 ++-
 .../indicator/define/TestAvgIndicator.java         |  19 +-
 .../core/storage/StorageInstallerTestCase.java     |  77 ++++++++
 oap-server/server-library/library-client/pom.xml   |   2 +-
 .../client/elasticsearch/ElasticSearchClient.java  | 194 +++++++++++++++++++++
 .../ElasticSearchClientException.java}             |  16 +-
 .../elasticsearch/ElasticSearchClientTestCase.java |  62 +++++++
 .../oap/server/library/module/ModuleDefine.java    |   8 +-
 .../src/main/resources/application.yml             |  14 ++
 .../storage-elasticsearch-plugin/pom.xml           |   5 +
 .../StorageModuleElasticsearchConfig.java          |   3 +
 .../StorageModuleElasticsearchProvider.java        |  29 ++-
 .../elasticsearch/base/BatchProcessEsDAO.java      |  72 ++++++++
 .../elasticsearch/base/ColumnTypeEsMapping.java}   |  24 ++-
 .../storage/plugin/elasticsearch/base/EsDAO.java   |  71 ++++++++
 .../elasticsearch/base/PersistenceEsDAO.java       |  79 +++++++++
 .../elasticsearch/base/StorageEsInstaller.java     | 133 ++++++++++++++
 .../ElasticSearchColumnTypeMappingTestCase.java}   |  25 +--
 46 files changed, 1232 insertions(+), 139 deletions(-)

diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
index 7dcf14f..2b1e335 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
+++ 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
@@ -16,7 +16,6 @@
  *
  */
 
-
 package org.apache.skywalking.apm.collector.storage.base.dao;
 
 import java.util.List;
diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index 65360c1..c34718b 100644
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -55,7 +55,7 @@
         <h2.version>1.4.196</h2.version>
         <shardingjdbc.version>2.0.3</shardingjdbc.version>
         <commons-dbcp.version>1.4</commons-dbcp.version>
-        <elasticsearch.version>6.3.1</elasticsearch.version>
+        <elasticsearch.version>6.3.2</elasticsearch.version>
         <joda-time.version>2.9.9</joda-time.version>
         <kubernetes.version>2.0.0</kubernetes.version>
     </properties>
@@ -142,7 +142,7 @@
             </dependency>
             <dependency>
                 <groupId>org.elasticsearch.client</groupId>
-                <artifactId>elasticsearch-rest-client</artifactId>
+                <artifactId>elasticsearch-rest-high-level-client</artifactId>
                 <version>${elasticsearch.version}</version>
             </dependency>
             <dependency>
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 757c34a..d116ae8 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -48,7 +48,7 @@ public class CoreModuleProvider extends ModuleProvider {
         super();
         this.moduleConfig = new CoreModuleConfig();
         this.indicatorMapper = new IndicatorMapper();
-        this.workerMapper = new WorkerMapper(getManager());
+        this.workerMapper = new WorkerMapper();
     }
 
     @Override public String name() {
@@ -87,7 +87,7 @@ public class CoreModuleProvider extends ModuleProvider {
 
         try {
             indicatorMapper.load();
-            workerMapper.load();
+            workerMapper.load(getManager());
         } catch (IndicatorDefineLoadException | WorkerDefineLoadException e) {
             throw new ModuleStartException(e.getMessage(), e);
         }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java
similarity index 79%
copy from 
apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
copy to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java
index 7dcf14f..f290fd0 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java
@@ -16,14 +16,13 @@
  *
  */
 
-
-package org.apache.skywalking.apm.collector.storage.base.dao;
-
-import java.util.List;
+package org.apache.skywalking.oap.server.core;
 
 /**
- * @author peng-yongsheng
+ * @author wu-sheng
  */
-public interface IBatchDAO extends DAO {
-    void batchPersistence(List<?> batchCollection);
+public class UnexpectedException extends RuntimeException {
+    public UnexpectedException(String message) {
+        super(message);
+    }
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
index 62d5134..f9177fe 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
@@ -20,23 +20,17 @@ package 
org.apache.skywalking.oap.server.core.analysis.endpoint;
 
 import 
org.apache.skywalking.oap.server.core.analysis.worker.AbstractAggregatorWorker;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
  */
 public class EndpointLatencyAvgAggregateWorker extends 
AbstractAggregatorWorker<EndpointLatencyAvgIndicator> {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(EndpointLatencyAvgAggregateWorker.class);
-
-    private final EndpointLatencyAvgRemoteWorker remoter;
-
     public EndpointLatencyAvgAggregateWorker(ModuleManager moduleManager) {
         super(moduleManager);
-        this.remoter = new EndpointLatencyAvgRemoteWorker(moduleManager);
     }
 
-    @Override protected void onNext(EndpointLatencyAvgIndicator data) {
-        remoter.in(data);
+    @Override public Class nextWorkerClass() {
+        return EndpointLatencyAvgRemoteWorker.class;
     }
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
index e339afa..7054f3d 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
@@ -18,16 +18,33 @@
 
 package org.apache.skywalking.oap.server.core.analysis.endpoint;
 
+import java.util.*;
 import lombok.*;
-import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator;
+import org.apache.skywalking.oap.server.core.analysis.indicator.*;
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
 
 /**
  * @author peng-yongsheng
  */
 public class EndpointLatencyAvgIndicator extends AvgIndicator {
 
-    @Setter @Getter private int id;
+    private static final String NAME = "endpoint_latency_avg";
+    private static final String ID = "id";
+    private static final String SERVICE_ID = "service_id";
+    private static final String SERVICE_INSTANCE_ID = "service_instance_id";
+
+    @Setter @Getter @Column(columnName = ID) private int id;
+    @Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId;
+    @Setter @Getter @Column(columnName = SERVICE_INSTANCE_ID) private int 
serviceInstanceId;
+
+    @Override public String name() {
+        return NAME;
+    }
+
+    @Override public String id() {
+        return String.valueOf(id);
+    }
 
     @Override public int hashCode() {
         int result = 17;
@@ -56,18 +73,49 @@ public class EndpointLatencyAvgIndicator extends 
AvgIndicator {
     @Override public RemoteData.Builder serialize() {
         RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
         remoteBuilder.setDataIntegers(0, getId());
-        remoteBuilder.setDataIntegers(1, getCount());
+        remoteBuilder.setDataIntegers(1, getServiceId());
+        remoteBuilder.setDataIntegers(2, getServiceInstanceId());
+        remoteBuilder.setDataIntegers(3, getCount());
 
         remoteBuilder.setDataLongs(0, getTimeBucket());
         remoteBuilder.setDataLongs(1, getSummation());
+        remoteBuilder.setDataLongs(2, getValue());
+
         return remoteBuilder;
     }
 
     @Override public void deserialize(RemoteData remoteData) {
         setId(remoteData.getDataIntegers(0));
-        setCount(remoteData.getDataIntegers(1));
+        setServiceId(remoteData.getDataIntegers(1));
+        setServiceInstanceId(remoteData.getDataIntegers(2));
+        setCount(remoteData.getDataIntegers(3));
 
         setTimeBucket(remoteData.getDataLongs(0));
         setSummation(remoteData.getDataLongs(1));
+        setValue(remoteData.getDataLongs(2));
+    }
+
+    @Override public Map<String, Object> toMap() {
+        Map<String, Object> map = new HashMap<>();
+        map.put(ID, id);
+        map.put(SERVICE_ID, serviceId);
+        map.put(SERVICE_INSTANCE_ID, serviceInstanceId);
+        map.put(COUNT, getCount());
+        map.put(SUMMATION, getSummation());
+        map.put(VALUE, getValue());
+        map.put(TIME_BUCKET, getTimeBucket());
+        return map;
+    }
+
+    @Override public Indicator newOne(Map<String, Object> dbMap) {
+        EndpointLatencyAvgIndicator indicator = new 
EndpointLatencyAvgIndicator();
+        indicator.setId((Integer)dbMap.get(ID));
+        indicator.setServiceId((Integer)dbMap.get(SERVICE_ID));
+        
indicator.setServiceInstanceId((Integer)dbMap.get(SERVICE_INSTANCE_ID));
+        indicator.setCount((Integer)dbMap.get(COUNT));
+        indicator.setSummation((Long)dbMap.get(SUMMATION));
+        indicator.setValue((Long)dbMap.get(VALUE));
+        indicator.setTimeBucket((Long)dbMap.get(TIME_BUCKET));
+        return indicator;
     }
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
index e3c5c23..288d568 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
@@ -29,4 +29,8 @@ public class EndpointLatencyAvgPersistentWorker extends 
AbstractPersistentWorker
     public EndpointLatencyAvgPersistentWorker(ModuleManager moduleManager) {
         super(moduleManager);
     }
+
+    @Override protected boolean needMergeDBData() {
+        return true;
+    }
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
index da065f3..1bae8ec 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
@@ -21,15 +21,21 @@ package 
org.apache.skywalking.oap.server.core.analysis.indicator;
 import lombok.*;
 import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.*;
 import org.apache.skywalking.oap.server.core.remote.selector.Selector;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
 
 /**
  * @author peng-yongsheng
  */
-@IndicatorType(selector = Selector.HashCode)
+@IndicatorType(selector = Selector.HashCode, needMerge = true)
 public abstract class AvgIndicator extends Indicator {
 
-    @Getter @Setter private long summation;
-    @Getter @Setter private int count;
+    protected static final String SUMMATION = "summation";
+    protected static final String COUNT = "count";
+    protected static final String VALUE = "value";
+
+    @Getter @Setter @Column(columnName = SUMMATION) private long summation;
+    @Getter @Setter @Column(columnName = COUNT) private int count;
+    @Getter @Setter @Column(columnName = VALUE) private long value;
 
     @Entrance
     public final void combine(@SourceFrom long summation, @ConstOne int count) 
{
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
index 533379e..de93255 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
@@ -18,15 +18,27 @@
 
 package org.apache.skywalking.oap.server.core.analysis.indicator;
 
+import java.util.Map;
 import lombok.*;
 import org.apache.skywalking.oap.server.core.analysis.data.StreamData;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
 
 /**
  * @author peng-yongsheng
  */
 public abstract class Indicator extends StreamData {
 
-    @Getter @Setter private long timeBucket;
+    protected static final String TIME_BUCKET = "time_bucket";
+
+    @Getter @Setter @Column(columnName = TIME_BUCKET) private long timeBucket;
+
+    public abstract String id();
 
     public abstract void combine(Indicator indicator);
+
+    public abstract String name();
+
+    public abstract Map<String, Object> toMap();
+
+    public abstract Indicator newOne(Map<String, Object> dbMap);
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
index 46f0345..d1ad273 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
@@ -28,4 +28,6 @@ import 
org.apache.skywalking.oap.server.core.remote.selector.Selector;
 @Retention(RetentionPolicy.SOURCE)
 public @interface IndicatorType {
     Selector selector();
+
+    boolean needMerge();
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java
index 8513fa2..b60e3f7 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java
@@ -79,4 +79,8 @@ public class IndicatorMapper implements Service {
     public Class<Indicator> findClassById(int id) {
         return idKeyMapping.get(id);
     }
+
+    public Collection<Class<Indicator>> indicatorClasses() {
+        return idKeyMapping.values();
+    }
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
index 65d68b4..1fadf90 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
@@ -21,8 +21,10 @@ package 
org.apache.skywalking.oap.server.core.analysis.worker;
 import java.util.*;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.analysis.data.*;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import 
org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.slf4j.*;
 
@@ -33,11 +35,14 @@ public abstract class AbstractAggregatorWorker<INPUT 
extends Indicator> extends
 
     private static final Logger logger = 
LoggerFactory.getLogger(AbstractAggregatorWorker.class);
 
+    private Worker worker;
+    private final ModuleManager moduleManager;
     private final DataCarrier<INPUT> dataCarrier;
     private final MergeDataCache<INPUT> mergeDataCache;
     private int messageNum;
 
     public AbstractAggregatorWorker(ModuleManager moduleManager) {
+        this.moduleManager = moduleManager;
         this.mergeDataCache = new MergeDataCache<>();
         this.dataCarrier = new DataCarrier<>(1, 10000);
         this.dataCarrier.consume(new AggregatorConsumer(this), 1);
@@ -78,7 +83,15 @@ public abstract class AbstractAggregatorWorker<INPUT extends 
Indicator> extends
         mergeDataCache.finishReadingLast();
     }
 
-    protected abstract void onNext(INPUT data);
+    private void onNext(INPUT data) {
+        if (worker == null) {
+            WorkerMapper workerMapper = 
moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class);
+            worker = workerMapper.findInstanceByClass(nextWorkerClass());
+        }
+        worker.in(data);
+    }
+
+    public abstract Class nextWorkerClass();
 
     private void aggregate(INPUT message) {
         mergeDataCache.writing();
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
index bcead67..59f1502 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
@@ -18,18 +18,119 @@
 
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
+import java.util.*;
+import org.apache.skywalking.oap.server.core.analysis.data.*;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.storage.*;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.slf4j.*;
+
+import static java.util.Objects.nonNull;
 
 /**
  * @author peng-yongsheng
  */
 public abstract class AbstractPersistentWorker<INPUT extends Indicator> 
extends Worker<INPUT> {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(AbstractPersistentWorker.class);
+
+    private final MergeDataCache<INPUT> mergeDataCache;
+    private final IBatchDAO batchDAO;
+    private final IPersistenceDAO<?, ?, INPUT> persistenceDAO;
+    private final int blockBatchPersistenceSize = 1000;
+
     public AbstractPersistentWorker(ModuleManager moduleManager) {
+        this.mergeDataCache = new MergeDataCache<>();
+        this.batchDAO = 
moduleManager.find(StorageModule.NAME).getService(IBatchDAO.class);
+        this.persistenceDAO = 
moduleManager.find(StorageModule.NAME).getService(IPersistenceDAO.class);
+    }
+
+    public final Window<MergeDataCollection<INPUT>> getCache() {
+        return mergeDataCache;
     }
 
     @Override public final void in(INPUT input) {
+        if (getCache().currentCollectionSize() >= blockBatchPersistenceSize) {
+            try {
+                if (getCache().trySwitchPointer()) {
+                    getCache().switchPointer();
+
+                    List<?> collection = buildBatchCollection();
+                    batchDAO.batchPersistence(collection);
+                }
+            } finally {
+                getCache().trySwitchPointerFinally();
+            }
+        }
+        cacheData(input);
+    }
+
+    public final List<?> buildBatchCollection() {
+        List<?> batchCollection = new LinkedList<>();
+        try {
+            while (getCache().getLast().isWriting()) {
+                try {
+                    Thread.sleep(10);
+                } catch (InterruptedException e) {
+                    logger.warn("thread wake up");
+                }
+            }
 
+            if (getCache().getLast().collection() != null) {
+                batchCollection = prepareBatch(getCache().getLast());
+            }
+        } finally {
+            getCache().finishReadingLast();
+        }
+        return batchCollection;
     }
+
+    private List<Object> prepareBatch(MergeDataCollection<INPUT> collection) {
+        List<Object> batchCollection = new LinkedList<>();
+        collection.collection().forEach((id, data) -> {
+            if (needMergeDBData()) {
+                INPUT dbData = null;
+                try {
+                    dbData = persistenceDAO.get(data);
+                } catch (Throwable t) {
+                    logger.error(t.getMessage(), t);
+                }
+                if (nonNull(dbData)) {
+                    dbData.combine(data);
+                    try {
+                        
batchCollection.add(persistenceDAO.prepareBatchUpdate(dbData));
+                    } catch (Throwable t) {
+                        logger.error(t.getMessage(), t);
+                    }
+                } else {
+                    try {
+                        
batchCollection.add(persistenceDAO.prepareBatchInsert(data));
+                    } catch (Throwable t) {
+                        logger.error(t.getMessage(), t);
+                    }
+                }
+            } else {
+                try {
+                    
batchCollection.add(persistenceDAO.prepareBatchInsert(data));
+                } catch (Throwable t) {
+                    logger.error(t.getMessage(), t);
+                }
+            }
+        });
+
+        return batchCollection;
+    }
+
+    private void cacheData(INPUT input) {
+        mergeDataCache.writing();
+        if (mergeDataCache.containsKey(input)) {
+            mergeDataCache.get(input).combine(input);
+        } else {
+            mergeDataCache.put(input);
+        }
+
+        mergeDataCache.finishWriting();
+    }
+
+    protected abstract boolean needMergeDBData();
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java
index 9732430..5d81b9b 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java
@@ -34,14 +34,12 @@ public class WorkerMapper implements Service {
     private static final Logger logger = 
LoggerFactory.getLogger(WorkerMapper.class);
 
     private int id = 0;
-    private final ModuleManager moduleManager;
     private final Map<Class<Worker>, Integer> classKeyMapping;
     private final Map<Integer, Class<Worker>> idKeyMapping;
     private final Map<Class<Worker>, Worker> classKeyInstanceMapping;
     private final Map<Integer, Worker> idKeyInstanceMapping;
 
-    public WorkerMapper(ModuleManager moduleManager) {
-        this.moduleManager = moduleManager;
+    public WorkerMapper() {
         this.classKeyMapping = new HashMap<>();
         this.idKeyMapping = new HashMap<>();
         this.classKeyInstanceMapping = new HashMap<>();
@@ -49,7 +47,7 @@ public class WorkerMapper implements Service {
     }
 
     @SuppressWarnings(value = "unchecked")
-    public void load() throws WorkerDefineLoadException {
+    public void load(ModuleManager moduleManager) throws 
WorkerDefineLoadException {
         try {
             List<String> workerClasses = new LinkedList<>();
 
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/Endpoint.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/Endpoint.java
index 24fe14d..c6be8d2 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/Endpoint.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/Endpoint.java
@@ -20,10 +20,12 @@ package org.apache.skywalking.oap.server.core.receiver;
 
 import lombok.*;
 import org.apache.skywalking.apm.network.language.agent.SpanLayer;
+import org.apache.skywalking.oap.server.core.receiver.annotation.SourceType;
 
 /**
  * @author peng-yongsheng
  */
+@SourceType
 public class Endpoint extends Source {
     @Override public Scope scope() {
         return Scope.Endpoint;
@@ -31,7 +33,9 @@ public class Endpoint extends Source {
 
     @Getter @Setter private int id;
     @Getter @Setter private String name;
+    @Getter @Setter private int serviceId;
     @Getter @Setter private String serviceName;
+    @Getter @Setter private int serviceInstanceId;
     @Getter @Setter private String serviceInstanceName;
     @Getter @Setter private int latency;
     @Getter @Setter private boolean status;
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/annotation/SourceType.java
similarity index 81%
copy from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
copy to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/annotation/SourceType.java
index 46f0345..2065022 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/annotation/SourceType.java
@@ -16,16 +16,14 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
+package org.apache.skywalking.oap.server.core.receiver.annotation;
 
 import java.lang.annotation.*;
-import org.apache.skywalking.oap.server.core.remote.selector.Selector;
 
 /**
  * @author peng-yongsheng
  */
 @Target(ElementType.TYPE)
 @Retention(RetentionPolicy.SOURCE)
-public @interface IndicatorType {
-    Selector selector();
+public @interface SourceType {
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/AbstractDAO.java
similarity index 75%
copy from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
copy to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/AbstractDAO.java
index ccd7db0..db65fd7 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/AbstractDAO.java
@@ -18,20 +18,19 @@
 
 package org.apache.skywalking.oap.server.core.storage;
 
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.library.client.Client;
 
 /**
  * @author peng-yongsheng
  */
-public class StorageModule extends ModuleDefine {
+public abstract class AbstractDAO<C extends Client> implements DAO {
+    private final C client;
 
-    public static final String NAME = "storage";
-
-    @Override public String name() {
-        return NAME;
+    public AbstractDAO(C client) {
+        this.client = client;
     }
 
-    @Override public Class[] services() {
-        return new Class[] {};
+    public final C getClient() {
+        return client;
     }
 }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/DAO.java
similarity index 82%
copy from 
apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
copy to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/DAO.java
index 7dcf14f..ff1af29 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/DAO.java
@@ -16,14 +16,12 @@
  *
  */
 
+package org.apache.skywalking.oap.server.core.storage;
 
-package org.apache.skywalking.apm.collector.storage.base.dao;
-
-import java.util.List;
+import org.apache.skywalking.oap.server.library.module.Service;
 
 /**
  * @author peng-yongsheng
  */
-public interface IBatchDAO extends DAO {
-    void batchPersistence(List<?> batchCollection);
+public interface DAO extends Service {
 }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
similarity index 93%
copy from 
apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
copy to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
index 7dcf14f..5c7a88a 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
@@ -16,8 +16,7 @@
  *
  */
 
-
-package org.apache.skywalking.apm.collector.storage.base.dao;
+package org.apache.skywalking.oap.server.core.storage;
 
 import java.util.List;
 
@@ -25,5 +24,6 @@ import java.util.List;
  * @author peng-yongsheng
  */
 public interface IBatchDAO extends DAO {
+
     void batchPersistence(List<?> batchCollection);
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IPersistenceDAO.java
similarity index 70%
copy from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
copy to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IPersistenceDAO.java
index bcead67..2a9d1f9 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IPersistenceDAO.java
@@ -16,20 +16,21 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis.worker;
+package org.apache.skywalking.oap.server.core.storage;
 
+import java.io.IOException;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
 
 /**
  * @author peng-yongsheng
  */
-public abstract class AbstractPersistentWorker<INPUT extends Indicator> 
extends Worker<INPUT> {
+public interface IPersistenceDAO<INSERT, UPDATE, INPUT extends Indicator> 
extends DAO {
 
-    public AbstractPersistentWorker(ModuleManager moduleManager) {
-    }
+    INPUT get(INPUT input) throws IOException;
 
-    @Override public final void in(INPUT input) {
+    INSERT prepareBatchInsert(INPUT input) throws IOException;
 
-    }
+    UPDATE prepareBatchUpdate(INPUT input) throws IOException;
+
+    void deleteHistory(Long timeBucketBefore);
 }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageException.java
similarity index 82%
copy from 
apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
copy to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageException.java
index 7dcf14f..b9a4d94 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageException.java
@@ -16,14 +16,14 @@
  *
  */
 
-
-package org.apache.skywalking.apm.collector.storage.base.dao;
-
-import java.util.List;
+package org.apache.skywalking.oap.server.core.storage;
 
 /**
  * @author peng-yongsheng
  */
-public interface IBatchDAO extends DAO {
-    void batchPersistence(List<?> batchCollection);
+public class StorageException extends Exception {
+
+    public StorageException(String message) {
+        super(message);
+    }
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageInstaller.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageInstaller.java
new file mode 100644
index 0000000..9e4545c
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageInstaller.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage;
+
+import java.util.*;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import 
org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper;
+import 
org.apache.skywalking.oap.server.core.storage.annotation.ColumnAnnotationRetrieval;
+import org.apache.skywalking.oap.server.core.storage.define.*;
+import org.apache.skywalking.oap.server.library.client.Client;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public abstract class StorageInstaller {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(StorageInstaller.class);
+
+    private final ModuleManager moduleManager;
+    private final ColumnAnnotationRetrieval annotationRetrieval;
+
+    public StorageInstaller(ModuleManager moduleManager) {
+        this.moduleManager = moduleManager;
+        this.annotationRetrieval = new ColumnAnnotationRetrieval();
+    }
+
+    public final void install(Client client) throws StorageException {
+        IndicatorMapper indicatorMapper = 
moduleManager.find(CoreModule.NAME).getService(IndicatorMapper.class);
+        Collection<Class<Indicator>> indicatorClasses = 
indicatorMapper.indicatorClasses();
+
+        Boolean debug = System.getProperty("debug") != null;
+        for (Class<Indicator> indicatorClass : indicatorClasses) {
+            List<ColumnDefine> columnDefines = 
annotationRetrieval.retrieval(indicatorClass);
+
+            String tableName;
+            try {
+                tableName = indicatorClass.newInstance().name();
+            } catch (InstantiationException | IllegalAccessException e) {
+                throw new StorageException(e.getMessage());
+            }
+            TableDefine tableDefine = new TableDefine(tableName, 
columnDefines);
+
+            if (!isExists(client, tableDefine)) {
+                logger.info("table: {} not exists", tableDefine.getName());
+                createTable(client, tableDefine);
+            } else if (debug) {
+                logger.info("table: {} exists", tableDefine.getName());
+                deleteTable(client, tableDefine);
+                createTable(client, tableDefine);
+            }
+            columnCheck(client, tableDefine);
+        }
+    }
+
+    protected abstract boolean isExists(Client client, TableDefine 
tableDefine) throws StorageException;
+
+    protected abstract void columnCheck(Client client, TableDefine 
tableDefine) throws StorageException;
+
+    protected abstract void deleteTable(Client client, TableDefine 
tableDefine) throws StorageException;
+
+    protected abstract void createTable(Client client, TableDefine 
tableDefine) throws StorageException;
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
index ccd7db0..559eb5c 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
@@ -32,6 +32,6 @@ public class StorageModule extends ModuleDefine {
     }
 
     @Override public Class[] services() {
-        return new Class[] {};
+        return new Class[] {IBatchDAO.class, IPersistenceDAO.class};
     }
 }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java
similarity index 79%
copy from 
apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
copy to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java
index 7dcf14f..aa6828b 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java
@@ -16,14 +16,15 @@
  *
  */
 
+package org.apache.skywalking.oap.server.core.storage.annotation;
 
-package org.apache.skywalking.apm.collector.storage.base.dao;
-
-import java.util.List;
+import java.lang.annotation.*;
 
 /**
  * @author peng-yongsheng
  */
-public interface IBatchDAO extends DAO {
-    void batchPersistence(List<?> batchCollection);
+@Target(ElementType.FIELD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Column {
+    String columnName();
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ColumnAnnotationRetrieval.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ColumnAnnotationRetrieval.java
new file mode 100644
index 0000000..cf6a3dc
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ColumnAnnotationRetrieval.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage.annotation;
+
+import java.lang.reflect.Field;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.storage.define.*;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class ColumnAnnotationRetrieval {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ColumnAnnotationRetrieval.class);
+
+    public List<ColumnDefine> retrieval(Class<Indicator> indicatorClass) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Retrieval column annotation from class {}", 
indicatorClass.getName());
+        }
+        List<ColumnDefine> columnDefines = new LinkedList<>();
+        retrieval(indicatorClass, columnDefines);
+        return columnDefines;
+    }
+
+    private void retrieval(Class clazz, List<ColumnDefine> columnDefines) {
+        Field[] fields = clazz.getDeclaredFields();
+
+        for (Field field : fields) {
+            if (field.isAnnotationPresent(Column.class)) {
+                Column column = field.getAnnotation(Column.class);
+                columnDefines.add(new ColumnDefine(new 
ColumnName(column.columnName(), column.columnName()), field.getType()));
+                if (logger.isDebugEnabled()) {
+                    logger.debug("The field named {} with the {} type", 
column.columnName(), field.getType());
+                }
+            }
+        }
+
+        if (Objects.nonNull(clazz.getSuperclass())) {
+            retrieval(clazz.getSuperclass(), columnDefines);
+        }
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnDefine.java
similarity index 66%
copy from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
copy to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnDefine.java
index ccd7db0..9bf4a87 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnDefine.java
@@ -16,22 +16,25 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.storage;
-
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+package org.apache.skywalking.oap.server.core.storage.define;
 
 /**
  * @author peng-yongsheng
  */
-public class StorageModule extends ModuleDefine {
+public class ColumnDefine {
+    private final ColumnName columnName;
+    private final Class<?> type;
 
-    public static final String NAME = "storage";
+    public ColumnDefine(ColumnName columnName, Class<?> type) {
+        this.columnName = columnName;
+        this.type = type;
+    }
 
-    @Override public String name() {
-        return NAME;
+    public final ColumnName getColumnName() {
+        return columnName;
     }
 
-    @Override public Class[] services() {
-        return new Class[] {};
+    public final Class<?> getType() {
+        return type;
     }
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnName.java
similarity index 63%
copy from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
copy to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnName.java
index ccd7db0..8dc8882 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnName.java
@@ -16,22 +16,26 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.storage;
-
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+package org.apache.skywalking.oap.server.core.storage.define;
 
 /**
  * @author peng-yongsheng
  */
-public class StorageModule extends ModuleDefine {
+public class ColumnName {
+    private final String fullName;
+    private final String shortName;
+    private boolean useShortName = false;
 
-    public static final String NAME = "storage";
+    public ColumnName(String fullName, String shortName) {
+        this.fullName = fullName;
+        this.shortName = shortName;
+    }
 
-    @Override public String name() {
-        return NAME;
+    public String getName() {
+        return useShortName ? shortName : fullName;
     }
 
-    @Override public Class[] services() {
-        return new Class[] {};
+    public void useShortName() {
+        this.useShortName = true;
     }
 }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnTypeMapping.java
similarity index 82%
copy from 
apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
copy to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnTypeMapping.java
index 7dcf14f..3a6c3a8 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnTypeMapping.java
@@ -16,14 +16,12 @@
  *
  */
 
-
-package org.apache.skywalking.apm.collector.storage.base.dao;
-
-import java.util.List;
+package org.apache.skywalking.oap.server.core.storage.define;
 
 /**
  * @author peng-yongsheng
  */
-public interface IBatchDAO extends DAO {
-    void batchPersistence(List<?> batchCollection);
+public interface ColumnTypeMapping {
+
+    String transform(Class<?> type);
 }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/TableDefine.java
similarity index 64%
copy from 
apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
copy to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/TableDefine.java
index 7dcf14f..e0f59ff 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/TableDefine.java
@@ -16,14 +16,27 @@
  *
  */
 
-
-package org.apache.skywalking.apm.collector.storage.base.dao;
+package org.apache.skywalking.oap.server.core.storage.define;
 
 import java.util.List;
 
 /**
  * @author peng-yongsheng
  */
-public interface IBatchDAO extends DAO {
-    void batchPersistence(List<?> batchCollection);
+public class TableDefine {
+    private final String name;
+    private final List<ColumnDefine> columnDefines;
+
+    public TableDefine(String name, List<ColumnDefine> columnDefines) {
+        this.name = name;
+        this.columnDefines = columnDefines;
+    }
+
+    public final String getName() {
+        return name;
+    }
+
+    public final List<ColumnDefine> getColumnDefines() {
+        return columnDefines;
+    }
 }
diff --git 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
index 17d1189..9865ff3 100644
--- 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
+++ 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
@@ -18,8 +18,9 @@
 
 package org.apache.skywalking.oap.server.core.analysis.indicator.define;
 
+import java.util.Map;
 import lombok.*;
-import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator;
+import org.apache.skywalking.oap.server.core.analysis.indicator.*;
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
 
 /**
@@ -35,4 +36,20 @@ public class TestAvgIndicator extends AvgIndicator {
 
     @Override public void deserialize(RemoteData remoteData) {
     }
+
+    @Override public String id() {
+        return null;
+    }
+
+    @Override public String name() {
+        return null;
+    }
+
+    @Override public Map<String, Object> toMap() {
+        return null;
+    }
+
+    @Override public Indicator newOne(Map<String, Object> dbMap) {
+        return null;
+    }
 }
diff --git 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java
 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java
new file mode 100644
index 0000000..533c1be
--- /dev/null
+++ 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage;
+
+import java.util.LinkedList;
+import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.define.*;
+import org.apache.skywalking.oap.server.core.storage.define.TableDefine;
+import org.apache.skywalking.oap.server.library.client.Client;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.powermock.reflect.Whitebox;
+
+/**
+ * @author peng-yongsheng
+ */
+public class StorageInstallerTestCase {
+
+    @Test
+    public void testInstall() throws StorageException, 
DuplicateProviderException, ServiceNotProvidedException, 
IndicatorDefineLoadException {
+        IndicatorMapper indicatorMapper = new IndicatorMapper();
+        CoreModuleProvider moduleProvider = 
Mockito.mock(CoreModuleProvider.class);
+        CoreModule moduleDefine = Mockito.spy(CoreModule.class);
+        ModuleManager moduleManager = Mockito.mock(ModuleManager.class);
+
+        LinkedList<ModuleProvider> moduleProviders = 
Whitebox.getInternalState(moduleDefine, "loadedProviders");
+        moduleProviders.add(moduleProvider);
+
+        
Mockito.when(moduleManager.find(CoreModule.NAME)).thenReturn(moduleDefine);
+        
Mockito.when(moduleProvider.getService(IndicatorMapper.class)).thenReturn(indicatorMapper);
+
+        indicatorMapper.load();
+
+        TestStorageInstaller installer = new 
TestStorageInstaller(moduleManager);
+        installer.install(null);
+    }
+
+    class TestStorageInstaller extends StorageInstaller {
+
+        public TestStorageInstaller(ModuleManager moduleManager) {
+            super(moduleManager);
+        }
+
+        @Override protected boolean isExists(Client client, TableDefine 
tableDefine) throws StorageException {
+            return false;
+        }
+
+        @Override protected void columnCheck(Client client, TableDefine 
tableDefine) throws StorageException {
+
+        }
+
+        @Override protected void deleteTable(Client client, TableDefine 
tableDefine) throws StorageException {
+
+        }
+
+        @Override protected void createTable(Client client, TableDefine 
tableDefine) throws StorageException {
+
+        }
+    }
+}
diff --git a/oap-server/server-library/library-client/pom.xml 
b/oap-server/server-library/library-client/pom.xml
index 7ceece8..98fa2de 100644
--- a/oap-server/server-library/library-client/pom.xml
+++ b/oap-server/server-library/library-client/pom.xml
@@ -49,7 +49,7 @@
         </dependency>
         <dependency>
             <groupId>org.elasticsearch.client</groupId>
-            <artifactId>elasticsearch-rest-client</artifactId>
+            <artifactId>elasticsearch-rest-high-level-client</artifactId>
         </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git 
a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
 
b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
new file mode 100644
index 0000000..520e3f0
--- /dev/null
+++ 
b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.client.elasticsearch;
+
+import java.io.IOException;
+import java.util.*;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.*;
+import org.apache.http.entity.ContentType;
+import org.apache.http.nio.entity.NStringEntity;
+import org.apache.skywalking.oap.server.library.client.Client;
+import org.apache.skywalking.oap.server.library.client.*;
+import org.elasticsearch.action.admin.indices.create.*;
+import org.elasticsearch.action.admin.indices.delete.*;
+import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
+import org.elasticsearch.action.bulk.*;
+import org.elasticsearch.action.get.*;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.*;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.*;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.*;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class ElasticSearchClient implements Client {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ElasticSearchClient.class);
+
+    private static final String TYPE = "type";
+    private final String clusterNodes;
+    private final NameSpace namespace;
+    private RestHighLevelClient client;
+
+    public ElasticSearchClient(String clusterNodes, NameSpace namespace) {
+        this.clusterNodes = clusterNodes;
+        this.namespace = namespace;
+    }
+
+    @Override public void initialize() {
+        List<HttpHost> pairsList = parseClusterNodes(clusterNodes);
+
+        client = new RestHighLevelClient(
+            RestClient.builder(pairsList.toArray(new HttpHost[0])));
+    }
+
+    @Override public void shutdown() {
+        try {
+            client.close();
+        } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    private List<HttpHost> parseClusterNodes(String nodes) {
+        List<HttpHost> httpHosts = new LinkedList<>();
+        logger.info("elasticsearch cluster nodes: {}", nodes);
+        String[] nodesSplit = nodes.split(",");
+        for (String node : nodesSplit) {
+            String host = node.split(":")[0];
+            String port = node.split(":")[1];
+            httpHosts.add(new HttpHost(host, Integer.valueOf(port)));
+        }
+
+        return httpHosts;
+    }
+
+    public boolean createIndex(String indexName, Settings settings,
+        XContentBuilder mappingBuilder) throws IOException {
+        indexName = formatIndexName(indexName);
+        CreateIndexRequest request = new CreateIndexRequest(indexName);
+        request.settings(settings);
+        request.mapping(TYPE, mappingBuilder);
+        CreateIndexResponse response;
+        response = client.indices().create(request);
+        logger.info("create {} index finished, isAcknowledged: {}", indexName, 
response.isAcknowledged());
+        return response.isAcknowledged();
+    }
+
+    public boolean deleteIndex(String indexName) throws IOException {
+        indexName = formatIndexName(indexName);
+        DeleteIndexRequest request = new DeleteIndexRequest(indexName);
+        DeleteIndexResponse response;
+        response = client.indices().delete(request);
+        logger.info("delete {} index finished, isAcknowledged: {}", indexName, 
response.isAcknowledged());
+        return response.isAcknowledged();
+    }
+
+    public boolean isExistsIndex(String indexName) throws IOException {
+        indexName = formatIndexName(indexName);
+        GetIndexRequest request = new GetIndexRequest();
+        request.indices(indexName);
+        return client.indices().exists(request);
+    }
+
+    public SearchResponse search(String indexName, SearchSourceBuilder 
searchSourceBuilder) throws IOException {
+        indexName = formatIndexName(indexName);
+        SearchRequest searchRequest = new SearchRequest(indexName);
+        searchRequest.types(TYPE);
+        searchRequest.source(searchSourceBuilder);
+        return client.search(searchRequest);
+    }
+
+    public GetResponse get(String indexName, String id) throws IOException {
+        indexName = formatIndexName(indexName);
+        GetRequest request = new GetRequest(indexName, TYPE, id);
+        return client.get(request);
+    }
+
+    public IndexRequest prepareInsert(String indexName, String id, 
XContentBuilder source) {
+        indexName = formatIndexName(indexName);
+        return new IndexRequest(indexName, TYPE, id).source(source);
+    }
+
+    public UpdateRequest prepareUpdate(String indexName, String id, 
XContentBuilder source) {
+        indexName = formatIndexName(indexName);
+        return new UpdateRequest(indexName, TYPE, id).doc(source);
+    }
+
+    public void delete(String indexName, String timeBucketColumnName, long 
startTimeBucket,
+        long endTimeBucket) throws IOException {
+        indexName = formatIndexName(indexName);
+        Map<String, String> params = Collections.singletonMap("pretty", 
"true");
+        String jsonString = "{" +
+            "  \"query\": {" +
+            "    \"range\": {" +
+            "      \"" + timeBucketColumnName + "\": {" +
+            "        \"gte\": " + startTimeBucket + "," +
+            "        \"lte\": " + endTimeBucket + "" +
+            "      }" +
+            "    }" +
+            "  }" +
+            "}";
+        HttpEntity entity = new NStringEntity(jsonString, 
ContentType.APPLICATION_JSON);
+        client.getLowLevelClient().performRequest("POST", "/" + indexName + 
"/_delete_by_query", params, entity);
+    }
+
+    private String formatIndexName(String indexName) {
+        if (Objects.nonNull(namespace) && 
StringUtils.isNotEmpty(namespace.getNameSpace())) {
+            return namespace.getNameSpace() + "_" + indexName;
+        }
+        return indexName;
+    }
+
+    public BulkProcessor createBulkProcessor(int bulkActions, int bulkSize, 
int flushInterval,
+        int concurrentRequests) {
+        BulkProcessor.Listener listener = new BulkProcessor.Listener() {
+            @Override
+            public void beforeBulk(long executionId, BulkRequest request) {
+
+            }
+
+            @Override
+            public void afterBulk(long executionId, BulkRequest request,
+                BulkResponse response) {
+
+            }
+
+            @Override
+            public void afterBulk(long executionId, BulkRequest request, 
Throwable failure) {
+                logger.error("{} data bulk failed, reason: {}", 
request.numberOfActions(), failure);
+            }
+        };
+
+        return BulkProcessor.builder(client::bulkAsync, listener)
+            .setBulkActions(bulkActions)
+            .setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB))
+            .setFlushInterval(TimeValue.timeValueSeconds(flushInterval))
+            .setConcurrentRequests(concurrentRequests)
+            
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100),
 3))
+            .build();
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
 
b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientException.java
similarity index 68%
copy from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
copy to 
oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientException.java
index ccd7db0..bb71d07 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
+++ 
b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientException.java
@@ -16,22 +16,20 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.storage;
+package org.apache.skywalking.oap.server.library.client.elasticsearch;
 
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.library.client.ClientException;
 
 /**
  * @author peng-yongsheng
  */
-public class StorageModule extends ModuleDefine {
+public class ElasticSearchClientException extends ClientException {
 
-    public static final String NAME = "storage";
-
-    @Override public String name() {
-        return NAME;
+    public ElasticSearchClientException(String message) {
+        super(message);
     }
 
-    @Override public Class[] services() {
-        return new Class[] {};
+    public ElasticSearchClientException(String message, Throwable cause) {
+        super(message, cause);
     }
 }
diff --git 
a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java
 
b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java
new file mode 100644
index 0000000..f696202
--- /dev/null
+++ 
b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.client.elasticsearch;
+
+import java.io.IOException;
+import org.apache.skywalking.oap.server.library.client.ClientException;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.*;
+import org.junit.Assert;
+
+/**
+ * @author peng-yongsheng
+ */
+public class ElasticSearchClientTestCase {
+
+    public static void main(String[] args) throws IOException, ClientException 
{
+        Settings settings = Settings.builder()
+            .put("number_of_shards", 2)
+            .put("number_of_replicas", 0)
+            .build();
+
+        XContentBuilder builder = XContentFactory.jsonBuilder();
+        builder.startObject()
+            .startObject("_all")
+            .field("enabled", false)
+            .endObject()
+            .startObject("properties")
+            .startObject("column1")
+            .field("type", "text")
+            .endObject()
+            .endObject();
+        builder.endObject();
+
+        ElasticSearchClient client = new ElasticSearchClient("localhost:9200", 
null);
+        client.initialize();
+
+        String indexName = "test";
+        client.createIndex(indexName, settings, builder);
+        Assert.assertTrue(client.isExistsIndex(indexName));
+        client.deleteIndex(indexName);
+        Assert.assertFalse(client.isExistsIndex(indexName));
+
+
+        client.shutdown();
+    }
+}
diff --git 
a/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleDefine.java
 
b/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleDefine.java
index 74c908d..5278215 100644
--- 
a/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleDefine.java
+++ 
b/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleDefine.java
@@ -18,11 +18,9 @@
 
 package org.apache.skywalking.oap.server.library.module;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.lang.reflect.Field;
 import java.util.*;
+import org.slf4j.*;
 
 /**
  * A module definition.
@@ -31,7 +29,7 @@ import java.util.*;
  */
 public abstract class ModuleDefine {
 
-    private final Logger logger = LoggerFactory.getLogger(ModuleDefine.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(ModuleDefine.class);
 
     private LinkedList<ModuleProvider> loadedProviders = new LinkedList<>();
 
@@ -128,7 +126,7 @@ public abstract class ModuleDefine {
         return loadedProviders;
     }
 
-    final ModuleProvider provider() throws DuplicateProviderException {
+    public final ModuleProvider provider() throws DuplicateProviderException {
         if (loadedProviders.size() > 1) {
             throw new DuplicateProviderException(this.name() + " module exist 
" + loadedProviders.size() + " providers");
         }
diff --git a/oap-server/server-starter/src/main/resources/application.yml 
b/oap-server/server-starter/src/main/resources/application.yml
index cabf8a2..46c3535 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -35,6 +35,20 @@ core:
     gRPCPort: 11800
 storage:
   elasticsearch:
+    clusterNodes: localhost:9200
+    indexShardsNumber: 2
+    indexReplicasNumber: 0
+    # Batch process setting, refer to 
https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
+    bulkActions: 2000 # Execute the bulk every 2000 requests
+    bulkSize: 20 # flush the bulk every 20mb
+    flushInterval: 10 # flush the bulk every 10 seconds whatever the number of 
requests
+    concurrentRequests: 2 # the number of concurrent requests
+    # Set a timeout on metric data. After the timeout has expired, the metric 
data will automatically be deleted.
+    traceDataTTL: 90 # Unit is minute
+    minuteMetricDataTTL: 90 # Unit is minute
+    hourMetricDataTTL: 36 # Unit is hour
+    dayMetricDataTTL: 45 # Unit is day
+    monthMetricDataTTL: 18 # Unit is month
 service-mesh:
   default:
 query:
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/pom.xml 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/pom.xml
index e0adc43..6c93cb6 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/pom.xml
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/pom.xml
@@ -36,5 +36,10 @@
             <artifactId>server-core</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.skywalking</groupId>
+            <artifactId>library-client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
index 92768ad..09b0bb2 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
 
+import lombok.*;
 import org.apache.skywalking.oap.server.library.module.ModuleConfig;
 
 /**
@@ -25,6 +26,8 @@ import 
org.apache.skywalking.oap.server.library.module.ModuleConfig;
  */
 public class StorageModuleElasticsearchConfig extends ModuleConfig {
 
+    @Setter @Getter private String nameSpace;
+    @Setter @Getter private String clusterNodes;
     private int indexShardsNumber;
     private int indexReplicasNumber;
     private boolean highPerformanceMode;
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 23367a5..b4aa7e8 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -18,8 +18,11 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
 
-import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.library.client.NameSpace;
+import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.*;
 import org.slf4j.*;
 
 /**
@@ -29,11 +32,14 @@ public class StorageModuleElasticsearchProvider extends 
ModuleProvider {
 
     private static final Logger logger = 
LoggerFactory.getLogger(StorageModuleElasticsearchProvider.class);
 
-    private final StorageModuleElasticsearchConfig storageConfig;
+    private final StorageModuleElasticsearchConfig config;
+    private final NameSpace nameSpace;
+    private ElasticSearchClient elasticSearchClient;
 
     public StorageModuleElasticsearchProvider() {
         super();
-        this.storageConfig = new StorageModuleElasticsearchConfig();
+        this.config = new StorageModuleElasticsearchConfig();
+        this.nameSpace = new NameSpace();
     }
 
     @Override
@@ -42,21 +48,34 @@ public class StorageModuleElasticsearchProvider extends 
ModuleProvider {
     }
 
     @Override
-    public Class module() {
+    public Class<? extends ModuleDefine> module() {
         return StorageModule.class;
     }
 
     @Override
     public ModuleConfig createConfigBeanIfAbsent() {
-        return storageConfig;
+        return config;
     }
 
     @Override
     public void prepare() throws ServiceNotProvidedException {
+        elasticSearchClient = new 
ElasticSearchClient(config.getClusterNodes(), nameSpace);
+
+        this.registerServiceImplementation(IBatchDAO.class, new 
BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), 
config.getBulkSize(), config.getFlushInterval(), 
config.getConcurrentRequests()));
+        this.registerServiceImplementation(IPersistenceDAO.class, new 
PersistenceEsDAO(elasticSearchClient, nameSpace));
     }
 
     @Override
     public void start() throws ModuleStartException {
+        try {
+            nameSpace.setNameSpace(config.getNameSpace());
+            elasticSearchClient.initialize();
+
+            StorageEsInstaller installer = new 
StorageEsInstaller(getManager(), config.getIndexShardsNumber(), 
config.getIndexReplicasNumber());
+            installer.install(elasticSearchClient);
+        } catch (StorageException e) {
+            throw new ModuleStartException(e.getMessage(), e);
+        }
     }
 
     @Override
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
new file mode 100644
index 0000000..0e9ff2f
--- /dev/null
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import java.util.List;
+import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
+import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(BatchProcessEsDAO.class);
+
+    private BulkProcessor bulkProcessor;
+    private final int bulkActions;
+    private final int bulkSize;
+    private final int flushInterval;
+    private final int concurrentRequests;
+
+    public BatchProcessEsDAO(ElasticSearchClient client, int bulkActions, int 
bulkSize, int flushInterval,
+        int concurrentRequests) {
+        super(client);
+        this.bulkActions = bulkActions;
+        this.bulkSize = bulkSize;
+        this.flushInterval = flushInterval;
+        this.concurrentRequests = concurrentRequests;
+    }
+
+    @Override public void batchPersistence(List<?> batchCollection) {
+        if (bulkProcessor == null) {
+            this.bulkProcessor = getClient().createBulkProcessor(bulkActions, 
bulkSize, flushInterval, concurrentRequests);
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("bulk data size: {}", batchCollection.size());
+        }
+
+        if (CollectionUtils.isNotEmpty(batchCollection)) {
+            batchCollection.forEach(builder -> {
+                if (builder instanceof IndexRequest) {
+                    this.bulkProcessor.add((IndexRequest)builder);
+                }
+                if (builder instanceof UpdateRequest) {
+                    this.bulkProcessor.add((UpdateRequest)builder);
+                }
+            });
+        }
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
similarity index 52%
copy from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
copy to 
oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
index bcead67..8e268c1 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
@@ -16,20 +16,26 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis.worker;
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
 
-import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.core.storage.define.ColumnTypeMapping;
 
 /**
  * @author peng-yongsheng
  */
-public abstract class AbstractPersistentWorker<INPUT extends Indicator> 
extends Worker<INPUT> {
-
-    public AbstractPersistentWorker(ModuleManager moduleManager) {
-    }
-
-    @Override public final void in(INPUT input) {
+public class ColumnTypeEsMapping implements ColumnTypeMapping {
 
+    @Override public String transform(Class<?> type) {
+        if (Integer.class.equals(type) || int.class.equals(type)) {
+            return "integer";
+        } else if (Long.class.equals(type) || long.class.equals(type)) {
+            return "long";
+        } else if (Double.class.equals(type) || double.class.equals(type)) {
+            return "double";
+        } else if (String.class.equals(type)) {
+            return "text";
+        } else {
+            throw new IllegalArgumentException("Unsupported data type: " + 
type.getName());
+        }
     }
 }
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
new file mode 100644
index 0000000..dd0a70b
--- /dev/null
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import java.io.IOException;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
+import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.metrics.max.Max;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> {
+
+    private static final Logger logger = LoggerFactory.getLogger(EsDAO.class);
+
+    public EsDAO(ElasticSearchClient client) {
+        super(client);
+    }
+
+    protected final int getMaxId(String indexName, String columnName) {
+        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+        
searchSourceBuilder.aggregation(AggregationBuilders.max("agg").field(columnName));
+        searchSourceBuilder.size(0);
+        return getResponse(indexName, searchSourceBuilder);
+    }
+
+    protected final int getMinId(String indexName, String columnName) {
+        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+        
searchSourceBuilder.aggregation(AggregationBuilders.min("agg").field(columnName));
+        searchSourceBuilder.size(0);
+        return getResponse(indexName, searchSourceBuilder);
+    }
+
+    private int getResponse(String indexName, SearchSourceBuilder 
searchSourceBuilder) {
+        try {
+            SearchResponse searchResponse = getClient().search(indexName, 
searchSourceBuilder);
+            Max agg = searchResponse.getAggregations().get("agg");
+
+            int id = (int)agg.getValue();
+            if (id == Integer.MAX_VALUE || id == Integer.MIN_VALUE) {
+                return 0;
+            } else {
+                return id;
+            }
+        } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+        }
+        return 0;
+    }
+}
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/PersistenceEsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/PersistenceEsDAO.java
new file mode 100644
index 0000000..1f83b63
--- /dev/null
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/PersistenceEsDAO.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.storage.IPersistenceDAO;
+import org.apache.skywalking.oap.server.library.client.NameSpace;
+import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.xcontent.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class PersistenceEsDAO implements IPersistenceDAO<IndexRequest, 
UpdateRequest, Indicator> {
+
+    private final ElasticSearchClient client;
+    private final NameSpace nameSpace;
+
+    public PersistenceEsDAO(ElasticSearchClient client, NameSpace nameSpace) {
+        this.client = client;
+        this.nameSpace = nameSpace;
+    }
+
+    @Override public Indicator get(Indicator input) throws IOException {
+        GetResponse response = client.get(nameSpace.getNameSpace() + "_" + 
input.name(), input.id());
+        if (response.isExists()) {
+            return input.newOne(response.getSource());
+        } else {
+            return null;
+        }
+    }
+
+    @Override public IndexRequest prepareBatchInsert(Indicator input) throws 
IOException {
+        Map<String, Object> objectMap = input.toMap();
+
+        XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
+        for (String key : objectMap.keySet()) {
+            builder.field(key, objectMap.get(key));
+        }
+        builder.endObject();
+        return client.prepareInsert(nameSpace.getNameSpace() + "_" + 
input.name(), input.id(), builder);
+    }
+
+    @Override public UpdateRequest prepareBatchUpdate(Indicator input) throws 
IOException {
+        Map<String, Object> objectMap = input.toMap();
+
+        XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
+        for (String key : objectMap.keySet()) {
+            builder.field(key, objectMap.get(key));
+        }
+        builder.endObject();
+        return client.prepareUpdate(nameSpace.getNameSpace() + "_" + 
input.name(), input.id(), builder);
+    }
+
+    @Override public void deleteHistory(Long timeBucketBefore) {
+
+    }
+}
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
new file mode 100644
index 0000000..07868d8
--- /dev/null
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import java.io.IOException;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.define.*;
+import org.apache.skywalking.oap.server.library.client.Client;
+import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.*;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class StorageEsInstaller extends StorageInstaller {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(StorageEsInstaller.class);
+
+    private final int indexShardsNumber;
+    private final int indexReplicasNumber;
+    private final ColumnTypeEsMapping mapping;
+
+    public StorageEsInstaller(ModuleManager moduleManager, int 
indexShardsNumber, int indexReplicasNumber) {
+        super(moduleManager);
+        this.indexShardsNumber = indexShardsNumber;
+        this.indexReplicasNumber = indexReplicasNumber;
+        this.mapping = new ColumnTypeEsMapping();
+    }
+
+    @Override protected boolean isExists(Client client, TableDefine 
tableDefine) throws StorageException {
+        ElasticSearchClient esClient = (ElasticSearchClient)client;
+        try {
+            return esClient.isExistsIndex(tableDefine.getName());
+        } catch (IOException e) {
+            throw new StorageException(e.getMessage());
+        }
+    }
+
+    @Override protected void columnCheck(Client client, TableDefine 
tableDefine) throws StorageException {
+
+    }
+
+    @Override protected void deleteTable(Client client, TableDefine 
tableDefine) throws StorageException {
+        ElasticSearchClient esClient = (ElasticSearchClient)client;
+
+        try {
+            if (!esClient.deleteIndex(tableDefine.getName())) {
+                throw new StorageException(tableDefine.getName() + " index 
delete failure.");
+            }
+        } catch (IOException e) {
+            throw new StorageException(tableDefine.getName() + " index delete 
failure.");
+        }
+    }
+
+    @Override protected void createTable(Client client, TableDefine 
tableDefine) throws StorageException {
+        ElasticSearchClient esClient = (ElasticSearchClient)client;
+
+        // mapping
+        XContentBuilder mappingBuilder = null;
+
+        Settings settings = createSettingBuilder();
+        try {
+            mappingBuilder = createMappingBuilder(tableDefine);
+            logger.info("mapping builder str: {}", 
mappingBuilder.prettyPrint());
+        } catch (Exception e) {
+            logger.error("create {} index mapping builder error", 
tableDefine.getName());
+        }
+
+        boolean isAcknowledged;
+        try {
+            isAcknowledged = esClient.createIndex(tableDefine.getName(), 
settings, mappingBuilder);
+        } catch (IOException e) {
+            throw new StorageException(e.getMessage());
+        }
+        logger.info("create {} index finished, isAcknowledged: {}", 
tableDefine.getName(), isAcknowledged);
+
+        if (!isAcknowledged) {
+            throw new StorageException("create " + tableDefine.getName() + " 
index failure, ");
+        }
+    }
+
+    private Settings createSettingBuilder() {
+        return Settings.builder()
+            .put("index.number_of_shards", indexShardsNumber)
+            .put("index.number_of_replicas", indexReplicasNumber)
+            .put("index.refresh_interval", "3s")
+            .put("analysis.analyzer.collector_analyzer.type", "stop")
+            .build();
+    }
+
+    private XContentBuilder createMappingBuilder(TableDefine tableDefine) 
throws IOException {
+        XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
+            .startObject()
+            .startObject("_all")
+            .field("enabled", false)
+            .endObject()
+            .startObject("properties");
+
+        for (ColumnDefine columnDefine : tableDefine.getColumnDefines()) {
+            mappingBuilder
+                .startObject(columnDefine.getColumnName().getName())
+                .field("type", mapping.transform(columnDefine.getType()))
+                .endObject();
+        }
+
+        mappingBuilder
+            .endObject()
+            .endObject();
+
+        logger.debug("create elasticsearch index: {}", 
mappingBuilder.prettyPrint());
+
+        return mappingBuilder;
+    }
+}
diff --git 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java
similarity index 53%
copy from 
oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
copy to 
oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java
index 17d1189..2553ab7 100644
--- 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java
@@ -16,23 +16,28 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis.indicator.define;
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
 
-import lombok.*;
-import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator;
-import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.junit.*;
 
 /**
  * @author peng-yongsheng
  */
-public class TestAvgIndicator extends AvgIndicator {
+public class ElasticSearchColumnTypeMappingTestCase {
 
-    @Setter @Getter private int id;
+    @Test
+    public void test() {
+        ColumnTypeEsMapping mapping = new ColumnTypeEsMapping();
 
-    @Override public RemoteData.Builder serialize() {
-        return null;
-    }
+        Assert.assertEquals("integer", mapping.transform(int.class));
+        Assert.assertEquals("integer", mapping.transform(Integer.class));
+
+        Assert.assertEquals("long", mapping.transform(long.class));
+        Assert.assertEquals("long", mapping.transform(Long.class));
+
+        Assert.assertEquals("double", mapping.transform(double.class));
+        Assert.assertEquals("double", mapping.transform(Double.class));
 
-    @Override public void deserialize(RemoteData remoteData) {
+        Assert.assertEquals("text", mapping.transform(String.class));
     }
 }

Reply via email to