This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-nodejs.git

commit cc2415120e42c1802f146a43edf74c0e1629bed2
Author: kezhenxu94 <kezhenx...@163.com>
AuthorDate: Sat Jun 13 23:16:50 2020 +0800

    Fix a bug and refactor some codes
---
 package.json                                       |  6 +-
 src/agent/Buffer.ts                                | 16 +++-
 src/agent/protocol/Protocol.ts                     |  3 +
 src/agent/protocol/grpc/GrpcProtocol.ts            | 90 ++--------------------
 src/agent/protocol/grpc/SegmentObjectAdapter.ts    |  3 +
 .../agent/protocol/grpc/clients/Client.ts          | 15 +---
 .../HeartbeatClient.ts}                            | 65 +++-------------
 .../TraceReportClient.ts}                          | 64 +++------------
 src/config/AgentConfig.ts                          |  2 +
 src/trace/context/ContextManager.ts                |  5 ++
 src/trace/context/SpanContext.ts                   |  5 +-
 src/trace/span/LocalSpan.ts                        |  5 +-
 src/trace/span/Span.ts                             |  6 ++
 src/trace/span/StackedSpan.ts                      |  2 +-
 14 files changed, 75 insertions(+), 212 deletions(-)

diff --git a/package.json b/package.json
index 0fcb9a0..43c164f 100644
--- a/package.json
+++ b/package.json
@@ -1,7 +1,7 @@
 {
   "name": "skywalking-nodejs",
   "version": "1.0.0",
-  "description": "",
+  "description": "The NodeJS agent for Apache SkyWalking",
   "main": "lib/index.js",
   "typings": "lib/index.d.ts",
   "scripts": {
@@ -19,8 +19,8 @@
     "lib/**/*"
   ],
   "keywords": [],
-  "author": "",
-  "license": "ISC",
+  "author": "kezhenxu94",
+  "license": "Apache 2.0",
   "devDependencies": {
     "@types/google-protobuf": "^3.7.2",
     "@types/node": "^14.0.11",
diff --git a/src/agent/Buffer.ts b/src/agent/Buffer.ts
index 755848a..63c28da 100644
--- a/src/agent/Buffer.ts
+++ b/src/agent/Buffer.ts
@@ -19,12 +19,18 @@
 
 import { createLogger } from '@/logging';
 import Segment from '@/trace/context/Segment';
+import config from '@/config/AgentConfig';
 
 const logger = createLogger('Buffer');
 
 class Buffer {
-  maxSize = 1000;
-  buffer: Segment[] = [];
+  maxSize: number;
+  buffer: Segment[];
+
+  constructor(maxSize: number = 1000) {
+    this.maxSize = maxSize;
+    this.buffer = [];
+  }
 
   get length(): number {
     return this.buffer.length;
@@ -41,4 +47,8 @@ class Buffer {
   }
 }
 
-export default new Buffer();
+export default new Buffer(
+  Number.isSafeInteger(config.maxBufferSize)
+    ? Number.parseInt(config.maxBufferSize, 10)
+    : 1000,
+);
diff --git a/src/agent/protocol/Protocol.ts b/src/agent/protocol/Protocol.ts
index 45085d4..7f87c6d 100644
--- a/src/agent/protocol/Protocol.ts
+++ b/src/agent/protocol/Protocol.ts
@@ -17,6 +17,9 @@
  *
  */
 
+/**
+ * The transport protocol between the agent and the backend (OAP).
+ */
 export default interface Protocol {
   isConnected: boolean;
 
diff --git a/src/agent/protocol/grpc/GrpcProtocol.ts 
b/src/agent/protocol/grpc/GrpcProtocol.ts
index 6e04232..1597491 100644
--- a/src/agent/protocol/grpc/GrpcProtocol.ts
+++ b/src/agent/protocol/grpc/GrpcProtocol.ts
@@ -18,101 +18,21 @@
  */
 
 import Protocol from '@/agent/protocol/Protocol';
-import * as grpc from 'grpc';
-import { connectivityState } from 'grpc';
-import config from '@/config/AgentConfig';
-import { ManagementServiceClient } from 
'@/proto/management/Management_grpc_pb';
-import { InstancePingPkg } from '@/proto/management/Management_pb';
-import { createLogger } from '@/logging';
-import AuthInterceptor from '@/agent/protocol/grpc/AuthInterceptor';
-import { TraceSegmentReportServiceClient } from 
'@/proto/language-agent/Tracing_grpc_pb';
-import buffer from '@/agent/Buffer';
-import SegmentObjectAdapter from '@/agent/protocol/grpc/SegmentObjectAdapter';
-
-const logger = createLogger('GrpcProtocol');
+import HeartbeatClient from '@/agent/protocol/grpc/clients/HeartbeatClient';
+import TraceReportClient from 
'@/agent/protocol/grpc/clients/TraceReportClient';
 
 export default class GrpcProtocol implements Protocol {
-  heartbeatClient: ManagementServiceClient;
-  heartbeatTimer?: NodeJS.Timeout;
-
-  reporterClient: TraceSegmentReportServiceClient;
-  reportTimer?: NodeJS.Timeout;
-
-  constructor() {
-    this.heartbeatClient = new ManagementServiceClient(
-      config.collectorAddress,
-      grpc.credentials.createInsecure(),
-      { interceptors: [AuthInterceptor] },
-    );
-
-    this.reporterClient = new TraceSegmentReportServiceClient(
-      config.collectorAddress,
-      grpc.credentials.createInsecure(),
-      { interceptors: [AuthInterceptor] },
-    );
-  }
 
   get isConnected(): boolean {
-    return (this.heartbeatClient.getChannel().getConnectivityState(true) === 
connectivityState.READY)
-      && (this.reporterClient.getChannel().getConnectivityState(true) === 
connectivityState.READY);
+    return HeartbeatClient.isConnected && TraceReportClient.isConnected;
   }
 
   heartbeat() {
-    if (this.heartbeatTimer) {
-      logger.warn(`
-        The heartbeat timer has already been scheduled,
-        this may be a potential bug, please consider reporting
-        this to https://github.com/apache/skywalking/issues/new
-      `);
-      return;
-    }
-
-    const keepAlivePkg = new InstancePingPkg()
-      .setService(config.serviceName)
-      .setServiceinstance(config.serviceInstance);
-
-    this.heartbeatTimer = setInterval(() => {
-        this.heartbeatClient.keepAlive(
-          keepAlivePkg,
-
-          (error, _) => {
-            if (error) {
-              logger.error('Failed to send heartbeat', error);
-            }
-          },
-        );
-      }, 3000,
-    ).unref();
+    HeartbeatClient.start();
   }
 
   report() {
-    const reportFunction = () => {
-      try {
-        if (buffer.length === 0) {
-          return;
-        }
-
-        const stream = this.reporterClient.collect((error, _) => {
-          if (error) {
-            logger.error('Failed to report trace data', error);
-          }
-        });
-
-        while (buffer.buffer.length > 0) {
-          const segment = buffer.buffer.pop();
-          if (segment) {
-            logger.info('Sending segment', { segment });
-            stream.write(new SegmentObjectAdapter(segment));
-          }
-        }
-
-        stream.end();
-      } finally {
-        this.reportTimer = setTimeout(reportFunction, 1000).unref();
-      }
-    };
-
-    this.reportTimer = setTimeout(reportFunction, 1000).unref();
+    TraceReportClient.start();
   }
 
 };
diff --git a/src/agent/protocol/grpc/SegmentObjectAdapter.ts 
b/src/agent/protocol/grpc/SegmentObjectAdapter.ts
index e6c336b..b5cb16c 100644
--- a/src/agent/protocol/grpc/SegmentObjectAdapter.ts
+++ b/src/agent/protocol/grpc/SegmentObjectAdapter.ts
@@ -22,6 +22,9 @@ import config from '@/config/AgentConfig';
 import { KeyStringValuePair } from '@/proto/common/Common_pb';
 import Segment from '@/trace/context/Segment';
 
+/**
+ * An adapter that adapts {@link Segment} objects to gRPC object {@link 
SegmentObject}.
+ */
 export default class SegmentObjectAdapter extends SegmentObject {
   constructor(segment: Segment) {
     super();
diff --git a/typings/environment.d.ts 
b/src/agent/protocol/grpc/clients/Client.ts
similarity index 79%
rename from typings/environment.d.ts
rename to src/agent/protocol/grpc/clients/Client.ts
index 7c8f1fd..bbd4361 100644
--- a/typings/environment.d.ts
+++ b/src/agent/protocol/grpc/clients/Client.ts
@@ -17,15 +17,8 @@
  *
  */
 
-declare global {
-  namespace NodeJS {
-    interface ProcessEnv {
-      AUTHORIZATION?: string;
-      COLLECTOR_ADDRESS?: string;
-      SERVICE_INSTANCE?: string;
-      SERVICE_NAME?: string;
-    }
-  }
-}
+export default interface Client {
+  readonly isConnected: boolean;
 
-export {};
+  start(): void;
+}
diff --git a/src/agent/protocol/grpc/GrpcProtocol.ts 
b/src/agent/protocol/grpc/clients/HeartbeatClient.ts
similarity index 57%
copy from src/agent/protocol/grpc/GrpcProtocol.ts
copy to src/agent/protocol/grpc/clients/HeartbeatClient.ts
index 6e04232..b1d21d1 100644
--- a/src/agent/protocol/grpc/GrpcProtocol.ts
+++ b/src/agent/protocol/grpc/clients/HeartbeatClient.ts
@@ -17,47 +17,35 @@
  *
  */
 
-import Protocol from '@/agent/protocol/Protocol';
+
+import { ManagementServiceClient } from 
'@/proto/management/Management_grpc_pb';
 import * as grpc from 'grpc';
 import { connectivityState } from 'grpc';
-import config from '@/config/AgentConfig';
-import { ManagementServiceClient } from 
'@/proto/management/Management_grpc_pb';
-import { InstancePingPkg } from '@/proto/management/Management_pb';
 import { createLogger } from '@/logging';
 import AuthInterceptor from '@/agent/protocol/grpc/AuthInterceptor';
-import { TraceSegmentReportServiceClient } from 
'@/proto/language-agent/Tracing_grpc_pb';
-import buffer from '@/agent/Buffer';
-import SegmentObjectAdapter from '@/agent/protocol/grpc/SegmentObjectAdapter';
+import { InstancePingPkg } from '@/proto/management/Management_pb';
+import config from '@/config/AgentConfig';
+import Client from '@/agent/protocol/grpc/clients/Client';
 
-const logger = createLogger('GrpcProtocol');
+const logger = createLogger('HeartbeatTask');
 
-export default class GrpcProtocol implements Protocol {
+class HeartbeatClient implements Client {
   heartbeatClient: ManagementServiceClient;
   heartbeatTimer?: NodeJS.Timeout;
 
-  reporterClient: TraceSegmentReportServiceClient;
-  reportTimer?: NodeJS.Timeout;
-
   constructor() {
     this.heartbeatClient = new ManagementServiceClient(
       config.collectorAddress,
       grpc.credentials.createInsecure(),
       { interceptors: [AuthInterceptor] },
     );
-
-    this.reporterClient = new TraceSegmentReportServiceClient(
-      config.collectorAddress,
-      grpc.credentials.createInsecure(),
-      { interceptors: [AuthInterceptor] },
-    );
   }
 
   get isConnected(): boolean {
-    return (this.heartbeatClient.getChannel().getConnectivityState(true) === 
connectivityState.READY)
-      && (this.reporterClient.getChannel().getConnectivityState(true) === 
connectivityState.READY);
+    return this.heartbeatClient.getChannel().getConnectivityState(true) === 
connectivityState.READY;
   }
 
-  heartbeat() {
+  start() {
     if (this.heartbeatTimer) {
       logger.warn(`
         The heartbeat timer has already been scheduled,
@@ -81,38 +69,9 @@ export default class GrpcProtocol implements Protocol {
             }
           },
         );
-      }, 3000,
+      }, 20000,
     ).unref();
   }
+}
 
-  report() {
-    const reportFunction = () => {
-      try {
-        if (buffer.length === 0) {
-          return;
-        }
-
-        const stream = this.reporterClient.collect((error, _) => {
-          if (error) {
-            logger.error('Failed to report trace data', error);
-          }
-        });
-
-        while (buffer.buffer.length > 0) {
-          const segment = buffer.buffer.pop();
-          if (segment) {
-            logger.info('Sending segment', { segment });
-            stream.write(new SegmentObjectAdapter(segment));
-          }
-        }
-
-        stream.end();
-      } finally {
-        this.reportTimer = setTimeout(reportFunction, 1000).unref();
-      }
-    };
-
-    this.reportTimer = setTimeout(reportFunction, 1000).unref();
-  }
-
-};
+export default new HeartbeatClient();
diff --git a/src/agent/protocol/grpc/GrpcProtocol.ts 
b/src/agent/protocol/grpc/clients/TraceReportClient.ts
similarity index 56%
copy from src/agent/protocol/grpc/GrpcProtocol.ts
copy to src/agent/protocol/grpc/clients/TraceReportClient.ts
index 6e04232..0d70b83 100644
--- a/src/agent/protocol/grpc/GrpcProtocol.ts
+++ b/src/agent/protocol/grpc/clients/TraceReportClient.ts
@@ -17,34 +17,22 @@
  *
  */
 
-import Protocol from '@/agent/protocol/Protocol';
+import { TraceSegmentReportServiceClient } from 
'@/proto/language-agent/Tracing_grpc_pb';
+import config from '@/config/AgentConfig';
 import * as grpc from 'grpc';
 import { connectivityState } from 'grpc';
-import config from '@/config/AgentConfig';
-import { ManagementServiceClient } from 
'@/proto/management/Management_grpc_pb';
-import { InstancePingPkg } from '@/proto/management/Management_pb';
-import { createLogger } from '@/logging';
 import AuthInterceptor from '@/agent/protocol/grpc/AuthInterceptor';
-import { TraceSegmentReportServiceClient } from 
'@/proto/language-agent/Tracing_grpc_pb';
 import buffer from '@/agent/Buffer';
 import SegmentObjectAdapter from '@/agent/protocol/grpc/SegmentObjectAdapter';
+import { createLogger } from '@/logging';
+import Client from '@/agent/protocol/grpc/clients/Client';
 
-const logger = createLogger('GrpcProtocol');
-
-export default class GrpcProtocol implements Protocol {
-  heartbeatClient: ManagementServiceClient;
-  heartbeatTimer?: NodeJS.Timeout;
+const logger = createLogger('TraceReportClient');
 
+class TraceReportClient implements Client {
   reporterClient: TraceSegmentReportServiceClient;
-  reportTimer?: NodeJS.Timeout;
 
   constructor() {
-    this.heartbeatClient = new ManagementServiceClient(
-      config.collectorAddress,
-      grpc.credentials.createInsecure(),
-      { interceptors: [AuthInterceptor] },
-    );
-
     this.reporterClient = new TraceSegmentReportServiceClient(
       config.collectorAddress,
       grpc.credentials.createInsecure(),
@@ -53,39 +41,10 @@ export default class GrpcProtocol implements Protocol {
   }
 
   get isConnected(): boolean {
-    return (this.heartbeatClient.getChannel().getConnectivityState(true) === 
connectivityState.READY)
-      && (this.reporterClient.getChannel().getConnectivityState(true) === 
connectivityState.READY);
-  }
-
-  heartbeat() {
-    if (this.heartbeatTimer) {
-      logger.warn(`
-        The heartbeat timer has already been scheduled,
-        this may be a potential bug, please consider reporting
-        this to https://github.com/apache/skywalking/issues/new
-      `);
-      return;
-    }
-
-    const keepAlivePkg = new InstancePingPkg()
-      .setService(config.serviceName)
-      .setServiceinstance(config.serviceInstance);
-
-    this.heartbeatTimer = setInterval(() => {
-        this.heartbeatClient.keepAlive(
-          keepAlivePkg,
-
-          (error, _) => {
-            if (error) {
-              logger.error('Failed to send heartbeat', error);
-            }
-          },
-        );
-      }, 3000,
-    ).unref();
+    return this.reporterClient.getChannel().getConnectivityState(true) === 
connectivityState.READY;
   }
 
-  report() {
+  start() {
     const reportFunction = () => {
       try {
         if (buffer.length === 0) {
@@ -108,11 +67,12 @@ export default class GrpcProtocol implements Protocol {
 
         stream.end();
       } finally {
-        this.reportTimer = setTimeout(reportFunction, 1000).unref();
+        setTimeout(reportFunction, 1000).unref();
       }
     };
 
-    this.reportTimer = setTimeout(reportFunction, 1000).unref();
+    setTimeout(reportFunction, 1000).unref();
   }
+}
 
-};
+export default new TraceReportClient();
diff --git a/src/config/AgentConfig.ts b/src/config/AgentConfig.ts
index 6270e1b..56f010b 100644
--- a/src/config/AgentConfig.ts
+++ b/src/config/AgentConfig.ts
@@ -22,6 +22,7 @@ export type AgentConfig = {
   serviceInstance?: string;
   collectorAddress?: string;
   authorization?: string;
+  maxBufferSize?: number;
 }
 
 export default {
@@ -29,4 +30,5 @@ export default {
   serviceInstance: process.env.SERVICE_INSTANCE || 'your-node-js-instance',
   collectorAddress: process.env.COLLECTOR_ADDRESS || '127.0.0.1:11800',
   authorization: process.env.AUTHORIZATION,
+  maxBufferSize: process.env.MAX_BUFFER_SIZE || '1000',
 };
diff --git a/src/trace/context/ContextManager.ts 
b/src/trace/context/ContextManager.ts
index 77fc7f6..2b5ce35 100644
--- a/src/trace/context/ContextManager.ts
+++ b/src/trace/context/ContextManager.ts
@@ -28,8 +28,13 @@ class ContextManager {
   constructor() {
     this.scopeContext = new Map<number, Context>();
 
+    this.scopeContext.set(1, new SpanContext());
+
     this.hooks = createHook({
       init: (asyncId: number, type: string, triggerAsyncId: number, resource: 
object) => {
+        if (type === 'TIMERWRAP') {
+          return;
+        }
         const context = this.scopeContext.get(triggerAsyncId) || new 
SpanContext();
         this.scopeContext.set(asyncId, context);
       },
diff --git a/src/trace/context/SpanContext.ts b/src/trace/context/SpanContext.ts
index f4addeb..3f92da3 100644
--- a/src/trace/context/SpanContext.ts
+++ b/src/trace/context/SpanContext.ts
@@ -27,6 +27,7 @@ import LocalSpan from '@/trace/span/LocalSpan';
 import * as assert from 'assert';
 import buffer from '@/agent/Buffer';
 import { createLogger } from '@/logging';
+import { executionAsyncId } from 'async_hooks';
 
 const logger = createLogger('SpanContext');
 
@@ -47,7 +48,7 @@ export default class SpanContext implements Context {
   }
 
   newEntrySpan(operation: string, carrier?: ContextCarrier): Span {
-    logger.debug('Creating entry span', { parentId: this.parentId });
+    logger.debug('Creating entry span', { parentId: this.parentId, 
executionAsyncId: executionAsyncId() });
     return new EntrySpan({
       id: this.spanId++,
       parentId: this.parentId,
@@ -57,6 +58,7 @@ export default class SpanContext implements Context {
   }
 
   newExitSpan(operation: string, peer: string, carrier?: ContextCarrier): Span 
{
+    logger.debug('Creating exit span', { parentId: this.parentId, 
executionAsyncId: executionAsyncId() });
     return new ExitSpan({
       id: this.spanId++,
       parentId: this.parentId,
@@ -66,6 +68,7 @@ export default class SpanContext implements Context {
   }
 
   newLocalSpan(operation: string): Span {
+    logger.debug('Creating local span', { parentId: this.parentId, 
executionAsyncId: executionAsyncId() });
     return new LocalSpan({
       id: this.spanId++,
       parentId: this.parentId,
diff --git a/src/trace/span/LocalSpan.ts b/src/trace/span/LocalSpan.ts
index 85969dc..c20e4da 100644
--- a/src/trace/span/LocalSpan.ts
+++ b/src/trace/span/LocalSpan.ts
@@ -17,11 +17,10 @@
  *
  */
 
-import StackedSpan from '@/trace/span/StackedSpan';
-import { SpanCtorOptions } from '@/trace/span/Span';
+import Span, { SpanCtorOptions } from '@/trace/span/Span';
 import { SpanType } from '@/proto/language-agent/Tracing_pb';
 
-export default class LocalSpan extends StackedSpan {
+export default class LocalSpan extends Span {
   constructor(options: SpanCtorOptions) {
     super(Object.assign(options, {
       type: SpanType.LOCAL,
diff --git a/src/trace/span/Span.ts b/src/trace/span/Span.ts
index bb690c8..388e194 100644
--- a/src/trace/span/Span.ts
+++ b/src/trace/span/Span.ts
@@ -26,6 +26,7 @@ import { ContextCarrier } from '@/trace/context/Carrier';
 import ID from '@/trace/ID';
 import SegmentRef from '@/trace/context/SegmentRef';
 import { SpanLayer, SpanType } from '@/proto/language-agent/Tracing_pb';
+import { createLogger } from '@/logging';
 
 export type SpanCtorOptions = {
   context: Context;
@@ -37,6 +38,8 @@ export type SpanCtorOptions = {
   component?: Component;
 };
 
+const logger = createLogger('Span');
+
 export default abstract class Span {
   readonly context: Context;
   readonly type: SpanType;
@@ -69,17 +72,20 @@ export default abstract class Span {
   }
 
   start(): this {
+    logger.debug('Starting span', this);
     this.startTime = new Date().getTime();
     this.context.start(this);
     return this;
   }
 
   stop(): this {
+    logger.debug('Stopping span', this);
     this.context.stop(this);
     return this;
   }
 
   finish(segment: Segment): boolean {
+    logger.debug('Finishing span', this);
     this.endTime = new Date().getTime();
     segment.archive(this);
     return true;
diff --git a/src/trace/span/StackedSpan.ts b/src/trace/span/StackedSpan.ts
index 196c7c1..357350b 100644
--- a/src/trace/span/StackedSpan.ts
+++ b/src/trace/span/StackedSpan.ts
@@ -32,7 +32,7 @@ export default class StackedSpan extends Span {
   }
 
   finish(segment: Segment): boolean {
-    logger.debug('Finishing', { depth: this.depth });
+    logger.debug('Finishing span', this);
     return --this.depth === 0 && super.finish(segment);
   }
 }

Reply via email to