This is an automated email from the ASF dual-hosted git repository. kezhenxu94 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/skywalking-nodejs.git
commit cc2415120e42c1802f146a43edf74c0e1629bed2 Author: kezhenxu94 <kezhenx...@163.com> AuthorDate: Sat Jun 13 23:16:50 2020 +0800 Fix a bug and refactor some codes --- package.json | 6 +- src/agent/Buffer.ts | 16 +++- src/agent/protocol/Protocol.ts | 3 + src/agent/protocol/grpc/GrpcProtocol.ts | 90 ++-------------------- src/agent/protocol/grpc/SegmentObjectAdapter.ts | 3 + .../agent/protocol/grpc/clients/Client.ts | 15 +--- .../HeartbeatClient.ts} | 65 +++------------- .../TraceReportClient.ts} | 64 +++------------ src/config/AgentConfig.ts | 2 + src/trace/context/ContextManager.ts | 5 ++ src/trace/context/SpanContext.ts | 5 +- src/trace/span/LocalSpan.ts | 5 +- src/trace/span/Span.ts | 6 ++ src/trace/span/StackedSpan.ts | 2 +- 14 files changed, 75 insertions(+), 212 deletions(-) diff --git a/package.json b/package.json index 0fcb9a0..43c164f 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "skywalking-nodejs", "version": "1.0.0", - "description": "", + "description": "The NodeJS agent for Apache SkyWalking", "main": "lib/index.js", "typings": "lib/index.d.ts", "scripts": { @@ -19,8 +19,8 @@ "lib/**/*" ], "keywords": [], - "author": "", - "license": "ISC", + "author": "kezhenxu94", + "license": "Apache 2.0", "devDependencies": { "@types/google-protobuf": "^3.7.2", "@types/node": "^14.0.11", diff --git a/src/agent/Buffer.ts b/src/agent/Buffer.ts index 755848a..63c28da 100644 --- a/src/agent/Buffer.ts +++ b/src/agent/Buffer.ts @@ -19,12 +19,18 @@ import { createLogger } from '@/logging'; import Segment from '@/trace/context/Segment'; +import config from '@/config/AgentConfig'; const logger = createLogger('Buffer'); class Buffer { - maxSize = 1000; - buffer: Segment[] = []; + maxSize: number; + buffer: Segment[]; + + constructor(maxSize: number = 1000) { + this.maxSize = maxSize; + this.buffer = []; + } get length(): number { return this.buffer.length; @@ -41,4 +47,8 @@ class Buffer { } } -export default new Buffer(); +export default new Buffer( + Number.isSafeInteger(config.maxBufferSize) + ? Number.parseInt(config.maxBufferSize, 10) + : 1000, +); diff --git a/src/agent/protocol/Protocol.ts b/src/agent/protocol/Protocol.ts index 45085d4..7f87c6d 100644 --- a/src/agent/protocol/Protocol.ts +++ b/src/agent/protocol/Protocol.ts @@ -17,6 +17,9 @@ * */ +/** + * The transport protocol between the agent and the backend (OAP). + */ export default interface Protocol { isConnected: boolean; diff --git a/src/agent/protocol/grpc/GrpcProtocol.ts b/src/agent/protocol/grpc/GrpcProtocol.ts index 6e04232..1597491 100644 --- a/src/agent/protocol/grpc/GrpcProtocol.ts +++ b/src/agent/protocol/grpc/GrpcProtocol.ts @@ -18,101 +18,21 @@ */ import Protocol from '@/agent/protocol/Protocol'; -import * as grpc from 'grpc'; -import { connectivityState } from 'grpc'; -import config from '@/config/AgentConfig'; -import { ManagementServiceClient } from '@/proto/management/Management_grpc_pb'; -import { InstancePingPkg } from '@/proto/management/Management_pb'; -import { createLogger } from '@/logging'; -import AuthInterceptor from '@/agent/protocol/grpc/AuthInterceptor'; -import { TraceSegmentReportServiceClient } from '@/proto/language-agent/Tracing_grpc_pb'; -import buffer from '@/agent/Buffer'; -import SegmentObjectAdapter from '@/agent/protocol/grpc/SegmentObjectAdapter'; - -const logger = createLogger('GrpcProtocol'); +import HeartbeatClient from '@/agent/protocol/grpc/clients/HeartbeatClient'; +import TraceReportClient from '@/agent/protocol/grpc/clients/TraceReportClient'; export default class GrpcProtocol implements Protocol { - heartbeatClient: ManagementServiceClient; - heartbeatTimer?: NodeJS.Timeout; - - reporterClient: TraceSegmentReportServiceClient; - reportTimer?: NodeJS.Timeout; - - constructor() { - this.heartbeatClient = new ManagementServiceClient( - config.collectorAddress, - grpc.credentials.createInsecure(), - { interceptors: [AuthInterceptor] }, - ); - - this.reporterClient = new TraceSegmentReportServiceClient( - config.collectorAddress, - grpc.credentials.createInsecure(), - { interceptors: [AuthInterceptor] }, - ); - } get isConnected(): boolean { - return (this.heartbeatClient.getChannel().getConnectivityState(true) === connectivityState.READY) - && (this.reporterClient.getChannel().getConnectivityState(true) === connectivityState.READY); + return HeartbeatClient.isConnected && TraceReportClient.isConnected; } heartbeat() { - if (this.heartbeatTimer) { - logger.warn(` - The heartbeat timer has already been scheduled, - this may be a potential bug, please consider reporting - this to https://github.com/apache/skywalking/issues/new - `); - return; - } - - const keepAlivePkg = new InstancePingPkg() - .setService(config.serviceName) - .setServiceinstance(config.serviceInstance); - - this.heartbeatTimer = setInterval(() => { - this.heartbeatClient.keepAlive( - keepAlivePkg, - - (error, _) => { - if (error) { - logger.error('Failed to send heartbeat', error); - } - }, - ); - }, 3000, - ).unref(); + HeartbeatClient.start(); } report() { - const reportFunction = () => { - try { - if (buffer.length === 0) { - return; - } - - const stream = this.reporterClient.collect((error, _) => { - if (error) { - logger.error('Failed to report trace data', error); - } - }); - - while (buffer.buffer.length > 0) { - const segment = buffer.buffer.pop(); - if (segment) { - logger.info('Sending segment', { segment }); - stream.write(new SegmentObjectAdapter(segment)); - } - } - - stream.end(); - } finally { - this.reportTimer = setTimeout(reportFunction, 1000).unref(); - } - }; - - this.reportTimer = setTimeout(reportFunction, 1000).unref(); + TraceReportClient.start(); } }; diff --git a/src/agent/protocol/grpc/SegmentObjectAdapter.ts b/src/agent/protocol/grpc/SegmentObjectAdapter.ts index e6c336b..b5cb16c 100644 --- a/src/agent/protocol/grpc/SegmentObjectAdapter.ts +++ b/src/agent/protocol/grpc/SegmentObjectAdapter.ts @@ -22,6 +22,9 @@ import config from '@/config/AgentConfig'; import { KeyStringValuePair } from '@/proto/common/Common_pb'; import Segment from '@/trace/context/Segment'; +/** + * An adapter that adapts {@link Segment} objects to gRPC object {@link SegmentObject}. + */ export default class SegmentObjectAdapter extends SegmentObject { constructor(segment: Segment) { super(); diff --git a/typings/environment.d.ts b/src/agent/protocol/grpc/clients/Client.ts similarity index 79% rename from typings/environment.d.ts rename to src/agent/protocol/grpc/clients/Client.ts index 7c8f1fd..bbd4361 100644 --- a/typings/environment.d.ts +++ b/src/agent/protocol/grpc/clients/Client.ts @@ -17,15 +17,8 @@ * */ -declare global { - namespace NodeJS { - interface ProcessEnv { - AUTHORIZATION?: string; - COLLECTOR_ADDRESS?: string; - SERVICE_INSTANCE?: string; - SERVICE_NAME?: string; - } - } -} +export default interface Client { + readonly isConnected: boolean; -export {}; + start(): void; +} diff --git a/src/agent/protocol/grpc/GrpcProtocol.ts b/src/agent/protocol/grpc/clients/HeartbeatClient.ts similarity index 57% copy from src/agent/protocol/grpc/GrpcProtocol.ts copy to src/agent/protocol/grpc/clients/HeartbeatClient.ts index 6e04232..b1d21d1 100644 --- a/src/agent/protocol/grpc/GrpcProtocol.ts +++ b/src/agent/protocol/grpc/clients/HeartbeatClient.ts @@ -17,47 +17,35 @@ * */ -import Protocol from '@/agent/protocol/Protocol'; + +import { ManagementServiceClient } from '@/proto/management/Management_grpc_pb'; import * as grpc from 'grpc'; import { connectivityState } from 'grpc'; -import config from '@/config/AgentConfig'; -import { ManagementServiceClient } from '@/proto/management/Management_grpc_pb'; -import { InstancePingPkg } from '@/proto/management/Management_pb'; import { createLogger } from '@/logging'; import AuthInterceptor from '@/agent/protocol/grpc/AuthInterceptor'; -import { TraceSegmentReportServiceClient } from '@/proto/language-agent/Tracing_grpc_pb'; -import buffer from '@/agent/Buffer'; -import SegmentObjectAdapter from '@/agent/protocol/grpc/SegmentObjectAdapter'; +import { InstancePingPkg } from '@/proto/management/Management_pb'; +import config from '@/config/AgentConfig'; +import Client from '@/agent/protocol/grpc/clients/Client'; -const logger = createLogger('GrpcProtocol'); +const logger = createLogger('HeartbeatTask'); -export default class GrpcProtocol implements Protocol { +class HeartbeatClient implements Client { heartbeatClient: ManagementServiceClient; heartbeatTimer?: NodeJS.Timeout; - reporterClient: TraceSegmentReportServiceClient; - reportTimer?: NodeJS.Timeout; - constructor() { this.heartbeatClient = new ManagementServiceClient( config.collectorAddress, grpc.credentials.createInsecure(), { interceptors: [AuthInterceptor] }, ); - - this.reporterClient = new TraceSegmentReportServiceClient( - config.collectorAddress, - grpc.credentials.createInsecure(), - { interceptors: [AuthInterceptor] }, - ); } get isConnected(): boolean { - return (this.heartbeatClient.getChannel().getConnectivityState(true) === connectivityState.READY) - && (this.reporterClient.getChannel().getConnectivityState(true) === connectivityState.READY); + return this.heartbeatClient.getChannel().getConnectivityState(true) === connectivityState.READY; } - heartbeat() { + start() { if (this.heartbeatTimer) { logger.warn(` The heartbeat timer has already been scheduled, @@ -81,38 +69,9 @@ export default class GrpcProtocol implements Protocol { } }, ); - }, 3000, + }, 20000, ).unref(); } +} - report() { - const reportFunction = () => { - try { - if (buffer.length === 0) { - return; - } - - const stream = this.reporterClient.collect((error, _) => { - if (error) { - logger.error('Failed to report trace data', error); - } - }); - - while (buffer.buffer.length > 0) { - const segment = buffer.buffer.pop(); - if (segment) { - logger.info('Sending segment', { segment }); - stream.write(new SegmentObjectAdapter(segment)); - } - } - - stream.end(); - } finally { - this.reportTimer = setTimeout(reportFunction, 1000).unref(); - } - }; - - this.reportTimer = setTimeout(reportFunction, 1000).unref(); - } - -}; +export default new HeartbeatClient(); diff --git a/src/agent/protocol/grpc/GrpcProtocol.ts b/src/agent/protocol/grpc/clients/TraceReportClient.ts similarity index 56% copy from src/agent/protocol/grpc/GrpcProtocol.ts copy to src/agent/protocol/grpc/clients/TraceReportClient.ts index 6e04232..0d70b83 100644 --- a/src/agent/protocol/grpc/GrpcProtocol.ts +++ b/src/agent/protocol/grpc/clients/TraceReportClient.ts @@ -17,34 +17,22 @@ * */ -import Protocol from '@/agent/protocol/Protocol'; +import { TraceSegmentReportServiceClient } from '@/proto/language-agent/Tracing_grpc_pb'; +import config from '@/config/AgentConfig'; import * as grpc from 'grpc'; import { connectivityState } from 'grpc'; -import config from '@/config/AgentConfig'; -import { ManagementServiceClient } from '@/proto/management/Management_grpc_pb'; -import { InstancePingPkg } from '@/proto/management/Management_pb'; -import { createLogger } from '@/logging'; import AuthInterceptor from '@/agent/protocol/grpc/AuthInterceptor'; -import { TraceSegmentReportServiceClient } from '@/proto/language-agent/Tracing_grpc_pb'; import buffer from '@/agent/Buffer'; import SegmentObjectAdapter from '@/agent/protocol/grpc/SegmentObjectAdapter'; +import { createLogger } from '@/logging'; +import Client from '@/agent/protocol/grpc/clients/Client'; -const logger = createLogger('GrpcProtocol'); - -export default class GrpcProtocol implements Protocol { - heartbeatClient: ManagementServiceClient; - heartbeatTimer?: NodeJS.Timeout; +const logger = createLogger('TraceReportClient'); +class TraceReportClient implements Client { reporterClient: TraceSegmentReportServiceClient; - reportTimer?: NodeJS.Timeout; constructor() { - this.heartbeatClient = new ManagementServiceClient( - config.collectorAddress, - grpc.credentials.createInsecure(), - { interceptors: [AuthInterceptor] }, - ); - this.reporterClient = new TraceSegmentReportServiceClient( config.collectorAddress, grpc.credentials.createInsecure(), @@ -53,39 +41,10 @@ export default class GrpcProtocol implements Protocol { } get isConnected(): boolean { - return (this.heartbeatClient.getChannel().getConnectivityState(true) === connectivityState.READY) - && (this.reporterClient.getChannel().getConnectivityState(true) === connectivityState.READY); - } - - heartbeat() { - if (this.heartbeatTimer) { - logger.warn(` - The heartbeat timer has already been scheduled, - this may be a potential bug, please consider reporting - this to https://github.com/apache/skywalking/issues/new - `); - return; - } - - const keepAlivePkg = new InstancePingPkg() - .setService(config.serviceName) - .setServiceinstance(config.serviceInstance); - - this.heartbeatTimer = setInterval(() => { - this.heartbeatClient.keepAlive( - keepAlivePkg, - - (error, _) => { - if (error) { - logger.error('Failed to send heartbeat', error); - } - }, - ); - }, 3000, - ).unref(); + return this.reporterClient.getChannel().getConnectivityState(true) === connectivityState.READY; } - report() { + start() { const reportFunction = () => { try { if (buffer.length === 0) { @@ -108,11 +67,12 @@ export default class GrpcProtocol implements Protocol { stream.end(); } finally { - this.reportTimer = setTimeout(reportFunction, 1000).unref(); + setTimeout(reportFunction, 1000).unref(); } }; - this.reportTimer = setTimeout(reportFunction, 1000).unref(); + setTimeout(reportFunction, 1000).unref(); } +} -}; +export default new TraceReportClient(); diff --git a/src/config/AgentConfig.ts b/src/config/AgentConfig.ts index 6270e1b..56f010b 100644 --- a/src/config/AgentConfig.ts +++ b/src/config/AgentConfig.ts @@ -22,6 +22,7 @@ export type AgentConfig = { serviceInstance?: string; collectorAddress?: string; authorization?: string; + maxBufferSize?: number; } export default { @@ -29,4 +30,5 @@ export default { serviceInstance: process.env.SERVICE_INSTANCE || 'your-node-js-instance', collectorAddress: process.env.COLLECTOR_ADDRESS || '127.0.0.1:11800', authorization: process.env.AUTHORIZATION, + maxBufferSize: process.env.MAX_BUFFER_SIZE || '1000', }; diff --git a/src/trace/context/ContextManager.ts b/src/trace/context/ContextManager.ts index 77fc7f6..2b5ce35 100644 --- a/src/trace/context/ContextManager.ts +++ b/src/trace/context/ContextManager.ts @@ -28,8 +28,13 @@ class ContextManager { constructor() { this.scopeContext = new Map<number, Context>(); + this.scopeContext.set(1, new SpanContext()); + this.hooks = createHook({ init: (asyncId: number, type: string, triggerAsyncId: number, resource: object) => { + if (type === 'TIMERWRAP') { + return; + } const context = this.scopeContext.get(triggerAsyncId) || new SpanContext(); this.scopeContext.set(asyncId, context); }, diff --git a/src/trace/context/SpanContext.ts b/src/trace/context/SpanContext.ts index f4addeb..3f92da3 100644 --- a/src/trace/context/SpanContext.ts +++ b/src/trace/context/SpanContext.ts @@ -27,6 +27,7 @@ import LocalSpan from '@/trace/span/LocalSpan'; import * as assert from 'assert'; import buffer from '@/agent/Buffer'; import { createLogger } from '@/logging'; +import { executionAsyncId } from 'async_hooks'; const logger = createLogger('SpanContext'); @@ -47,7 +48,7 @@ export default class SpanContext implements Context { } newEntrySpan(operation: string, carrier?: ContextCarrier): Span { - logger.debug('Creating entry span', { parentId: this.parentId }); + logger.debug('Creating entry span', { parentId: this.parentId, executionAsyncId: executionAsyncId() }); return new EntrySpan({ id: this.spanId++, parentId: this.parentId, @@ -57,6 +58,7 @@ export default class SpanContext implements Context { } newExitSpan(operation: string, peer: string, carrier?: ContextCarrier): Span { + logger.debug('Creating exit span', { parentId: this.parentId, executionAsyncId: executionAsyncId() }); return new ExitSpan({ id: this.spanId++, parentId: this.parentId, @@ -66,6 +68,7 @@ export default class SpanContext implements Context { } newLocalSpan(operation: string): Span { + logger.debug('Creating local span', { parentId: this.parentId, executionAsyncId: executionAsyncId() }); return new LocalSpan({ id: this.spanId++, parentId: this.parentId, diff --git a/src/trace/span/LocalSpan.ts b/src/trace/span/LocalSpan.ts index 85969dc..c20e4da 100644 --- a/src/trace/span/LocalSpan.ts +++ b/src/trace/span/LocalSpan.ts @@ -17,11 +17,10 @@ * */ -import StackedSpan from '@/trace/span/StackedSpan'; -import { SpanCtorOptions } from '@/trace/span/Span'; +import Span, { SpanCtorOptions } from '@/trace/span/Span'; import { SpanType } from '@/proto/language-agent/Tracing_pb'; -export default class LocalSpan extends StackedSpan { +export default class LocalSpan extends Span { constructor(options: SpanCtorOptions) { super(Object.assign(options, { type: SpanType.LOCAL, diff --git a/src/trace/span/Span.ts b/src/trace/span/Span.ts index bb690c8..388e194 100644 --- a/src/trace/span/Span.ts +++ b/src/trace/span/Span.ts @@ -26,6 +26,7 @@ import { ContextCarrier } from '@/trace/context/Carrier'; import ID from '@/trace/ID'; import SegmentRef from '@/trace/context/SegmentRef'; import { SpanLayer, SpanType } from '@/proto/language-agent/Tracing_pb'; +import { createLogger } from '@/logging'; export type SpanCtorOptions = { context: Context; @@ -37,6 +38,8 @@ export type SpanCtorOptions = { component?: Component; }; +const logger = createLogger('Span'); + export default abstract class Span { readonly context: Context; readonly type: SpanType; @@ -69,17 +72,20 @@ export default abstract class Span { } start(): this { + logger.debug('Starting span', this); this.startTime = new Date().getTime(); this.context.start(this); return this; } stop(): this { + logger.debug('Stopping span', this); this.context.stop(this); return this; } finish(segment: Segment): boolean { + logger.debug('Finishing span', this); this.endTime = new Date().getTime(); segment.archive(this); return true; diff --git a/src/trace/span/StackedSpan.ts b/src/trace/span/StackedSpan.ts index 196c7c1..357350b 100644 --- a/src/trace/span/StackedSpan.ts +++ b/src/trace/span/StackedSpan.ts @@ -32,7 +32,7 @@ export default class StackedSpan extends Span { } finish(segment: Segment): boolean { - logger.debug('Finishing', { depth: this.depth }); + logger.debug('Finishing span', this); return --this.depth === 0 && super.finish(segment); } }