This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 50a0bcb  Optimize L1 aggregation, enhance gRPC client between L1 and 
L2 aggregation. (#7206)
50a0bcb is described below

commit 50a0bcbf7395c49029759a8cb1208271405cdcc4
Author: 吴晟 Wu Sheng <[email protected]>
AuthorDate: Wed Jun 30 19:14:20 2021 +0800

    Optimize L1 aggregation, enhance gRPC client between L1 and L2 aggregation. 
(#7206)
    
    1.0Performance: Add L1 aggregation flush period, which reduces the CPU load 
and helps young GC.
    
    2. Replace do not direct send after the first aggregation to reduce the 
network #6400.
    
    3. Enhance the DataCarrier to notify the consumer in no enqueue event in 
short term.
    
    4. L1 aggregation flush period still works even no further metrics 
generated, powered by <3>
    
    5. Fix gRPC remote client OOM. The concurrency control mechanism failed.
---
 CHANGES.md                                         |  5 +-
 .../datacarrier/consumer/ConsumerThread.java       |  1 +
 .../commons/datacarrier/consumer/IConsumer.java    |  8 +++
 .../consumer/MultipleChannelsConsumer.java         |  3 +-
 docs/en/setup/backend/configuration-vocabulary.md  |  1 +
 .../src/main/resources/application.yml             |  2 +
 .../oap/server/core/CoreModuleConfig.java          |  9 ++-
 .../oap/server/core/CoreModuleProvider.java        |  3 +-
 .../analysis/worker/MetricsAggregateWorker.java    | 37 +++++++---
 .../analysis/worker/MetricsStreamProcessor.java    | 15 ++++-
 .../core/remote/client/GRPCRemoteClient.java       | 78 +++++++++++++---------
 11 files changed, 114 insertions(+), 48 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 8ae7936..ef03150 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -9,6 +9,8 @@ Release Notes.
 
 * Extract dependency management to a bom.
 * Add JDK 16 to test matrix.
+* DataCarrier consumer add a new event notification, call `nothingToConsume` 
method if the queue has no element to
+  consume.
 
 #### Java Agent
 
@@ -56,7 +58,8 @@ Release Notes.
 * Performance: cache regex pattern and result, optimize string concatenation 
in Envy ALS analyzer.
 * Performance: cache metrics id and entity id in `Metrics` and `ISource`.
 * Performance: enhance persistent session mechanism, about differentiating 
cache timeout for different dimensionality
-  metrics. The timeout of the cache for minute and hour level metrics has been 
prolonged to ~5 min. 
+  metrics. The timeout of the cache for minute and hour level metrics has been 
prolonged to ~5 min.
+* Performance: Add L1 aggregation flush period, which reduce the CPU load and 
help young GC.
 
 #### UI
 
diff --git 
a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java
 
b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java
index 5f35408..122f37e 100644
--- 
a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java
+++ 
b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java
@@ -80,6 +80,7 @@ public class ConsumerThread<T> extends Thread {
             }
             return true;
         }
+        consumer.nothingToConsume();
         return false;
     }
 
diff --git 
a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IConsumer.java
 
b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IConsumer.java
index 3c6dc0d..07793eb 100644
--- 
a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IConsumer.java
+++ 
b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IConsumer.java
@@ -28,4 +28,12 @@ public interface IConsumer<T> {
     void onError(List<T> data, Throwable t);
 
     void onExit();
+
+    /**
+     * Notify the implementation, if there is nothing fetched from the queue. 
This could be used as a timer to trigger
+     * reaction if the queue has no element.
+     */
+    default void nothingToConsume() {
+        return;
+    }
 }
diff --git 
a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
 
b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
index c65b8e8..953a21e 100644
--- 
a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
+++ 
b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
@@ -85,6 +85,7 @@ public class MultipleChannelsConsumer extends Thread {
             }
             return true;
         }
+        target.consumer.nothingToConsume();
         return false;
     }
 
@@ -112,7 +113,7 @@ public class MultipleChannelsConsumer extends Thread {
     }
 
     private static class Group {
-        private  Channels channels;
+        private Channels channels;
         private IConsumer consumer;
 
         public Group(Channels channels, IConsumer consumer) {
diff --git a/docs/en/setup/backend/configuration-vocabulary.md 
b/docs/en/setup/backend/configuration-vocabulary.md
index c435c8f..3fa9376 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -23,6 +23,7 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. 
**Receiver** mode
 | - | - | dataKeeperExecutePeriod|The execution period of TTL scheduler, unit 
is minute. Execution doesn't mean deleting data. The storage provider could 
override this, such as ElasticSearch 
storage.|SW_CORE_DATA_KEEPER_EXECUTE_PERIOD|5|
 | - | - | recordDataTTL|The lifecycle of record data. Record data includes 
traces, top n sampled records, and logs. Unit is day. Minimal value is 
2.|SW_CORE_RECORD_DATA_TTL|3|
 | - | - | metricsDataTTL|The lifecycle of metrics data, including the 
metadata. Unit is day. Recommend metricsDataTTL >= recordDataTTL. Minimal value 
is 2.| SW_CORE_METRICS_DATA_TTL|7|
+| - | - | l1FlushPeriod| The period of L1 aggregation flush to L2 aggregation. 
Unit is ms. | SW_CORE_L1_AGGREGATION_FLUSH_PERIOD | 500 |
 | - | - | enableDatabaseSession|Cache metrics data for 1 minute to reduce 
database queries, and if the OAP cluster changes within that 
minute.|SW_CORE_ENABLE_DATABASE_SESSION|true|
 | - | - | topNReportPeriod|The execution period of top N sampler, which saves 
sampled data into the storage. Unit is minute|SW_CORE_TOPN_REPORT_PERIOD|10|
 | - | - | activeExtraModelColumns|Append the names of entity, such as service 
name, into the metrics storage 
entities.|SW_CORE_ACTIVE_EXTRA_MODEL_COLUMNS|false|
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml 
b/oap-server/server-bootstrap/src/main/resources/application.yml
index e6db202..2f888fd 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -85,6 +85,8 @@ core:
     dataKeeperExecutePeriod: ${SW_CORE_DATA_KEEPER_EXECUTE_PERIOD:5} # How 
often the data keeper executor runs periodically, unit is minute
     recordDataTTL: ${SW_CORE_RECORD_DATA_TTL:3} # Unit is day
     metricsDataTTL: ${SW_CORE_METRICS_DATA_TTL:7} # Unit is day
+    # The period of L1 aggregation flush to L2 aggregation. Unit is ms.
+    l1FlushPeriod: ${SW_CORE_L1_AGGREGATION_FLUSH_PERIOD:500}
     # Cache metrics data for 1 minute to reduce database queries, and if the 
OAP cluster changes within that minute,
     # the metrics may not be accurate within that minute.
     enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
index 8ffcc56..af5f735 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
@@ -46,8 +46,15 @@ public class CoreModuleConfig extends ModuleConfig {
     private String gRPCSslTrustedCAPath;
     private int maxConcurrentCallsPerConnection;
     private int maxMessageSize;
-    private boolean enableDatabaseSession;
     private int topNReportPeriod;
+    /**
+     * The period of L1 aggregation flush. Unit is ms.
+     */
+    private long l1FlushPeriod = 500;
+    /**
+     * Enable database flush session.
+     */
+    private boolean enableDatabaseSession;
     private final List<String> downsampling;
     /**
      * The period of doing data persistence. Unit is second.
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 9e99bd1..a8bbcbe 100755
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -46,9 +46,9 @@ import 
org.apache.skywalking.oap.server.core.config.ConfigService;
 import org.apache.skywalking.oap.server.core.config.DownSamplingConfigService;
 import 
org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
 import org.apache.skywalking.oap.server.core.config.NamingControl;
-import 
org.apache.skywalking.oap.server.core.config.group.openapi.EndpointGroupingRuleReader4Openapi;
 import org.apache.skywalking.oap.server.core.config.group.EndpointNameGrouping;
 import 
org.apache.skywalking.oap.server.core.config.group.EndpointNameGroupingRuleWatcher;
+import 
org.apache.skywalking.oap.server.core.config.group.openapi.EndpointGroupingRuleReader4Openapi;
 import 
org.apache.skywalking.oap.server.core.management.ui.template.UITemplateInitializer;
 import 
org.apache.skywalking.oap.server.core.management.ui.template.UITemplateManagementService;
 import org.apache.skywalking.oap.server.core.oal.rt.DisableOALDefine;
@@ -289,6 +289,7 @@ public class CoreModuleProvider extends ModuleProvider {
             UITemplateManagementService.class, new 
UITemplateManagementService(getManager()));
 
         
MetricsStreamProcessor.getInstance().setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession());
+        
MetricsStreamProcessor.getInstance().setL1FlushPeriod(moduleConfig.getL1FlushPeriod());
         
TopNStreamProcessor.getInstance().setTopNWorkerReportCycle(moduleConfig.getTopNReportPeriod());
         apdexThresholdConfig = new ApdexThresholdConfig(this);
         ApdexMetrics.setDICT(apdexThresholdConfig);
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
index b6fd9b3..4e7c59e 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
@@ -42,13 +42,15 @@ import 
org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
  */
 @Slf4j
 public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
+    public final long l1FlushPeriod;
     private AbstractWorker<Metrics> nextWorker;
     private final DataCarrier<Metrics> dataCarrier;
     private final MergableBufferedData<Metrics> mergeDataCache;
     private CounterMetrics aggregationCounter;
+    private long lastSendTime = 0;
 
     MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder, 
AbstractWorker<Metrics> nextWorker,
-                           String modelName) {
+                           String modelName, long l1FlushPeriod) {
         super(moduleDefineHolder);
         this.nextWorker = nextWorker;
         this.mergeDataCache = new MergableBufferedData();
@@ -69,8 +71,10 @@ public class MetricsAggregateWorker extends 
AbstractWorker<Metrics> {
                                                           
.getService(MetricsCreator.class);
         aggregationCounter = metricsCreator.createCounter(
             "metrics_aggregation", "The number of rows in aggregation",
-            new MetricsTag.Keys("metricName", "level", "dimensionality"), new 
MetricsTag.Values(modelName, "1", "minute")
+            new MetricsTag.Keys("metricName", "level", "dimensionality"),
+            new MetricsTag.Values(modelName, "1", "minute")
         );
+        this.l1FlushPeriod = l1FlushPeriod;
     }
 
     /**
@@ -93,14 +97,22 @@ public class MetricsAggregateWorker extends 
AbstractWorker<Metrics> {
             mergeDataCache.accept(metrics);
         });
 
-        mergeDataCache.read().forEach(
-            data -> {
-                if (log.isDebugEnabled()) {
-                    log.debug(data.toString());
+        flush();
+    }
+
+    private void flush() {
+        long currentTime = System.currentTimeMillis();
+        if (currentTime - lastSendTime > l1FlushPeriod) {
+            mergeDataCache.read().forEach(
+                data -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug(data.toString());
+                    }
+                    nextWorker.in(data);
                 }
-                nextWorker.in(data);
-            }
-        );
+            );
+            lastSendTime = currentTime;
+        }
     }
 
     private class AggregatorConsumer implements IConsumer<Metrics> {
@@ -121,5 +133,10 @@ public class MetricsAggregateWorker extends 
AbstractWorker<Metrics> {
         @Override
         public void onExit() {
         }
+
+        @Override
+        public void nothingToConsume() {
+            flush();
+        }
     }
-}
+}
\ No newline at end of file
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index 9d8faf7..3a40777 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -71,6 +71,12 @@ public class MetricsStreamProcessor implements 
StreamProcessor<Metrics> {
     private List<MetricsPersistentWorker> persistentWorkers = new 
ArrayList<>();
 
     /**
+     * The period of L1 aggregation flush. Unit is ms.
+     */
+    @Setter
+    @Getter
+    private long l1FlushPeriod = 500;
+    /**
      * Hold and forward CoreModuleConfig#enableDatabaseSession to the 
persistent worker.
      */
     @Setter
@@ -97,7 +103,9 @@ public class MetricsStreamProcessor implements 
StreamProcessor<Metrics> {
      * @param metricsClass       data type of the streaming calculation.
      */
     @Override
-    public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, 
Class<? extends Metrics> metricsClass) throws StorageException {
+    public void create(ModuleDefineHolder moduleDefineHolder,
+                       Stream stream,
+                       Class<? extends Metrics> metricsClass) throws 
StorageException {
         this.create(moduleDefineHolder, StreamDefinition.from(stream), 
metricsClass);
     }
 
@@ -108,7 +116,8 @@ public class MetricsStreamProcessor implements 
StreamProcessor<Metrics> {
         final StorageBuilderFactory storageBuilderFactory = 
moduleDefineHolder.find(StorageModule.NAME)
                                                                               
.provider()
                                                                               
.getService(StorageBuilderFactory.class);
-        final Class<? extends StorageBuilder> builder = 
storageBuilderFactory.builderOf(metricsClass, stream.getBuilder());
+        final Class<? extends StorageBuilder> builder = 
storageBuilderFactory.builderOf(
+            metricsClass, stream.getBuilder());
 
         StorageDAO storageDAO = 
moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
         IMetricsDAO metricsDAO;
@@ -167,7 +176,7 @@ public class MetricsStreamProcessor implements 
StreamProcessor<Metrics> {
 
         MetricsRemoteWorker remoteWorker = new 
MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
         MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker(
-            moduleDefineHolder, remoteWorker, stream.getName());
+            moduleDefineHolder, remoteWorker, stream.getName(), l1FlushPeriod);
 
         entryWorkers.put(metricsClass, aggregateWorker);
     }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
index 59a89b9..51fda20 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
 import org.apache.skywalking.oap.server.core.remote.data.StreamData;
@@ -37,17 +38,13 @@ import 
org.apache.skywalking.oap.server.telemetry.TelemetryModule;
 import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
 import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
 import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * This is a wrapper of the gRPC client for sending message to each other OAP 
server. It contains a block queue to
  * buffering the message and sending the message by batch.
  */
+@Slf4j
 public class GRPCRemoteClient implements RemoteClient {
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(GRPCRemoteClient.class);
-
     private final int channelSize;
     private final int bufferSize;
     private final Address address;
@@ -76,13 +73,23 @@ public class GRPCRemoteClient implements RemoteClient {
         remoteOutCounter = moduleDefineHolder.find(TelemetryModule.NAME)
                                              .provider()
                                              .getService(MetricsCreator.class)
-                                             
.createCounter("remote_out_count", "The number(client side) of inside remote 
inside aggregate rpc.", new MetricsTag.Keys("dest", "self"), new 
MetricsTag.Values(address
-                                                                               
                                                                                
                                                  .toString(), "N"));
+                                             .createCounter(
+                                                 "remote_out_count",
+                                                 "The number(client side) of 
inside remote inside aggregate rpc.",
+                                                 new MetricsTag.Keys("dest", 
"self"), new MetricsTag.Values(
+                                                     address
+                                                         .toString(), "N")
+                                             );
         remoteOutErrorCounter = moduleDefineHolder.find(TelemetryModule.NAME)
                                                   .provider()
                                                   
.getService(MetricsCreator.class)
-                                                  
.createCounter("remote_out_error_count", "The error number(client side) of 
inside remote inside aggregate rpc.", new MetricsTag.Keys("dest", "self"), new 
MetricsTag.Values(address
-                                                                               
                                                                                
                                                                   .toString(), 
"N"));
+                                                  .createCounter(
+                                                      "remote_out_error_count",
+                                                      "The error number(client 
side) of inside remote inside aggregate rpc.",
+                                                      new 
MetricsTag.Keys("dest", "self"), new MetricsTag.Values(
+                                                          address
+                                                              .toString(), "N")
+                                                  );
     }
 
     @Override
@@ -160,13 +167,13 @@ public class GRPCRemoteClient implements RemoteClient {
                 streamObserver.onCompleted();
             } catch (Throwable t) {
                 remoteOutErrorCounter.inc();
-                LOGGER.error(t.getMessage(), t);
+                log.error(t.getMessage(), t);
             }
         }
 
         @Override
         public void onError(List<RemoteMessage> remoteMessages, Throwable t) {
-            LOGGER.error(t.getMessage(), t);
+            log.error(t.getMessage(), t);
         }
 
         @Override
@@ -183,38 +190,47 @@ public class GRPCRemoteClient implements RemoteClient {
     private StreamObserver<RemoteMessage> createStreamObserver() {
         int sleepTotalMillis = 0;
         int sleepMillis = 10;
-        while (concurrentStreamObserverNumber.incrementAndGet() > 10) {
-            concurrentStreamObserverNumber.addAndGet(-1);
 
+        // Control the concurrency of gRPC streaming stub.
+        // If over 10 created and not finished/error, this blocks the method.
+        while (concurrentStreamObserverNumber.get() > 10) {
             try {
                 Thread.sleep(sleepMillis);
             } catch (InterruptedException e) {
-                LOGGER.error(e.getMessage(), e);
+                log.error(e.getMessage(), e);
             }
 
             sleepTotalMillis += sleepMillis;
 
             if (sleepTotalMillis > 60000) {
-                LOGGER.warn("Remote client block times over 60 seconds.");
+                log.warn("Remote client [{}] block times over 60 seconds. 
Current streaming number {}",
+                         address, concurrentStreamObserverNumber.get()
+                );
+                // Reset sleepTotalMillis to avoid too many warn logs.
+                sleepTotalMillis = 0;
             }
         }
 
-        return getStub().withDeadlineAfter(remoteTimeout, 
TimeUnit.SECONDS).call(new StreamObserver<Empty>() {
-            @Override
-            public void onNext(Empty empty) {
-            }
-
-            @Override
-            public void onError(Throwable throwable) {
-                concurrentStreamObserverNumber.addAndGet(-1);
-                LOGGER.error(throwable.getMessage(), throwable);
-            }
-
-            @Override
-            public void onCompleted() {
-                concurrentStreamObserverNumber.addAndGet(-1);
-            }
-        });
+        final StreamObserver<RemoteMessage> remoteMessageStreamObserver
+            = getStub().withDeadlineAfter(remoteTimeout, TimeUnit.SECONDS)
+                       .call(new StreamObserver<Empty>() {
+                           @Override
+                           public void onNext(Empty empty) {
+                           }
+
+                           @Override
+                           public void onError(Throwable throwable) {
+                               concurrentStreamObserverNumber.addAndGet(-1);
+                               log.error(throwable.getMessage(), throwable);
+                           }
+
+                           @Override
+                           public void onCompleted() {
+                               concurrentStreamObserverNumber.addAndGet(-1);
+                           }
+                       });
+        concurrentStreamObserverNumber.incrementAndGet();
+        return remoteMessageStreamObserver;
     }
 
     @Override

Reply via email to