This is an automated email from the ASF dual-hosted git repository. gaoxingcun pushed a commit to branch reconfiguration_scheduling in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
commit 64afc2575a067f288d0315e25f0a3eb9598fc21f Author: TJxiaobao <2922035...@qq.com> AuthorDate: Tue Sep 9 21:24:35 2025 +0800 add:Improve the go code specification. --- examples/main_simulation.go | 1 + internal/cmd/server.go | 7 --- internal/collector/basic/init.go | 18 ++++---- .../common/collect/dispatch/metrics_collector.go | 7 ++- .../collector/common/collect/result_handler.go | 10 ++++- .../common/dispatcher/common_dispatcher.go | 21 +++------ internal/collector/common/dispatcher/time_wheel.go | 15 ++----- .../common/dispatcher/wheel_timer_task.go | 7 ++- internal/collector/common/job/job_server.go | 50 +++++++++++++--------- 9 files changed, 67 insertions(+), 69 deletions(-) diff --git a/examples/main_simulation.go b/examples/main_simulation.go new file mode 100644 index 0000000..c7842af --- /dev/null +++ b/examples/main_simulation.go @@ -0,0 +1 @@ +package examples diff --git a/internal/cmd/server.go b/internal/cmd/server.go index 81a8bfe..c67fef8 100644 --- a/internal/cmd/server.go +++ b/internal/cmd/server.go @@ -30,7 +30,6 @@ import ( "github.com/spf13/cobra" bannerouter "hertzbeat.apache.org/hertzbeat-collector-go/internal/banner" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic" jobserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/job" clrServer "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server" transportserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/transport" @@ -109,12 +108,6 @@ func server(ctx context.Context, logOut io.Writer) error { } func startRunners(ctx context.Context, cfg *clrServer.Server) error { - - // Initialize all collectors before starting runners - // This ensures all protocol collectors are registered and ready - cfg.Logger.Info("Initializing collectors...") - basic.InitializeAllCollectors(cfg.Logger) - runners := []struct { runner Runner[collectortypes.Info] }{ diff --git a/internal/collector/basic/init.go b/internal/collector/basic/init.go index 74c4aa6..f4a7504 100644 --- a/internal/collector/basic/init.go +++ b/internal/collector/basic/init.go @@ -25,16 +25,16 @@ import ( "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) -// init 函数在包被导入时自动执行 -// 集中注册所有协议的工厂函数 +// init function is automatically executed when the package is imported +// It centrally registers factory functions for all protocols func init() { - // 注册所有协议的工厂函数 - // 新增协议时只需要在这里添加一行 + // Register factory functions for all protocols + // To add a new protocol, simply add a line here strategy.RegisterFactory("jdbc", func(logger logger.Logger) strategy.Collector { return database.NewJDBCCollector(logger) }) - // 未来可以在这里添加更多协议: + // More protocols can be added here in the future: // strategy.RegisterFactory("http", func(logger logger.Logger) strategy.Collector { // return http.NewHTTPCollector(logger) // }) @@ -43,13 +43,13 @@ func init() { // }) } -// InitializeAllCollectors 初始化所有已注册的采集器 -// 此时init()函数已经注册了所有工厂函数 -// 这个函数创建实际的采集器实例 +// InitializeAllCollectors initializes all registered collectors +// At this point, the init() function has already registered all factory functions +// This function creates the actual collector instances func InitializeAllCollectors(logger logger.Logger) { logger.Info("initializing all collectors") - // 使用已注册的工厂函数创建采集器实例 + // Create collector instances using registered factory functions strategy.InitializeCollectors(logger) logger.Info("all collectors initialized successfully") diff --git a/internal/collector/common/collect/dispatch/metrics_collector.go b/internal/collector/common/collect/dispatch/metrics_collector.go index b569858..85e7580 100644 --- a/internal/collector/common/collect/dispatch/metrics_collector.go +++ b/internal/collector/common/collect/dispatch/metrics_collector.go @@ -21,8 +21,11 @@ package dispatch import ( "fmt" + "net/http" "time" + // Import basic package with blank identifier to trigger its init() function + // This ensures all collector factories are registered automatically _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic" "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy" jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" @@ -64,7 +67,7 @@ func (mc *MetricsCollector) CollectMetrics(metrics *jobtypes.Metrics, job *jobty "protocol", metrics.Protocol, "metricsName", metrics.Name) - result := mc.createErrorResponse(metrics, job, 500, fmt.Sprintf("Collector not found: %v", err)) + result := mc.createErrorResponse(metrics, job, http.StatusInternalServerError, fmt.Sprintf("Collector not found: %v", err)) resultChan <- result return } @@ -98,7 +101,7 @@ func (mc *MetricsCollector) CollectMetrics(metrics *jobtypes.Metrics, job *jobty duration := time.Since(startTime) - if result != nil && result.Code == 200 { + if result != nil && result.Code == http.StatusOK { mc.logger.Info("metrics collection completed successfully", "jobID", job.ID, "metricsName", metrics.Name, diff --git a/internal/collector/common/collect/result_handler.go b/internal/collector/common/collect/result_handler.go index 701ca63..7f5a304 100644 --- a/internal/collector/common/collect/result_handler.go +++ b/internal/collector/common/collect/result_handler.go @@ -21,6 +21,7 @@ package collect import ( "fmt" + "net/http" jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" @@ -32,8 +33,13 @@ type ResultHandlerImpl struct { // TODO: Add data queue or storage interface when needed } +// ResultHandler interface for handling collection results +type ResultHandler interface { + HandleCollectData(data *jobtypes.CollectRepMetricsData, job *jobtypes.Job) error +} + // NewResultHandler creates a new result handler -func NewResultHandler(logger logger.Logger) *ResultHandlerImpl { +func NewResultHandler(logger logger.Logger) ResultHandler { return &ResultHandlerImpl{ logger: logger.WithName("result-handler"), } @@ -61,7 +67,7 @@ func (rh *ResultHandlerImpl) HandleCollectData(data *jobtypes.CollectRepMetricsD // 4. Triggering alerts based on thresholds // 5. Updating monitoring status - if data.Code == 200 { + if data.Code == http.StatusOK { rh.logger.Info("successfully processed collect data", "jobID", job.ID, "metricsName", data.Metrics) diff --git a/internal/collector/common/dispatcher/common_dispatcher.go b/internal/collector/common/dispatcher/common_dispatcher.go index d7a58c0..d6e10f4 100644 --- a/internal/collector/common/dispatcher/common_dispatcher.go +++ b/internal/collector/common/dispatcher/common_dispatcher.go @@ -25,6 +25,7 @@ import ( "sync" "time" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect" jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) @@ -34,9 +35,7 @@ import ( type CommonDispatcherImpl struct { logger logger.Logger metricsCollector MetricsCollector - resultHandler ResultHandler - ctx context.Context - cancel context.CancelFunc + resultHandler collect.ResultHandler mu sync.RWMutex } @@ -45,26 +44,19 @@ type MetricsCollector interface { CollectMetrics(metrics *jobtypes.Metrics, job *jobtypes.Job, timeout *jobtypes.Timeout) chan *jobtypes.CollectRepMetricsData } -// ResultHandler interface for handling collection results -type ResultHandler interface { - HandleCollectData(data *jobtypes.CollectRepMetricsData, job *jobtypes.Job) error -} +// ResultHandler interface is now defined in collect/result_handler.go // NewCommonDispatcher creates a new common dispatcher -func NewCommonDispatcher(logger logger.Logger, metricsCollector MetricsCollector, resultHandler ResultHandler) *CommonDispatcherImpl { - ctx, cancel := context.WithCancel(context.Background()) - +func NewCommonDispatcher(logger logger.Logger, metricsCollector MetricsCollector, resultHandler collect.ResultHandler) *CommonDispatcherImpl { return &CommonDispatcherImpl{ logger: logger.WithName("common-dispatcher"), metricsCollector: metricsCollector, resultHandler: resultHandler, - ctx: ctx, - cancel: cancel, } } // DispatchMetricsTask dispatches a job by breaking it down into individual metrics collection tasks -func (cd *CommonDispatcherImpl) DispatchMetricsTask(job *jobtypes.Job, timeout *jobtypes.Timeout) error { +func (cd *CommonDispatcherImpl) DispatchMetricsTask(ctx context.Context, job *jobtypes.Job, timeout *jobtypes.Timeout) error { if job == nil { return fmt.Errorf("job cannot be nil") } @@ -85,7 +77,7 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(job *jobtypes.Job, timeout * } // Create collection context with timeout - collectCtx, collectCancel := context.WithTimeout(cd.ctx, cd.getCollectionTimeout(job)) + collectCtx, collectCancel := context.WithTimeout(ctx, cd.getCollectionTimeout(job)) defer collectCancel() // Collect all metrics concurrently using channels (Go's way of handling concurrency) @@ -239,6 +231,5 @@ func (cd *CommonDispatcherImpl) getCollectionTimeout(job *jobtypes.Job) time.Dur // Stop stops the common dispatcher func (cd *CommonDispatcherImpl) Stop() error { cd.logger.Info("stopping common dispatcher") - cd.cancel() return nil } diff --git a/internal/collector/common/dispatcher/time_wheel.go b/internal/collector/common/dispatcher/time_wheel.go index 2040ec0..41b6860 100644 --- a/internal/collector/common/dispatcher/time_wheel.go +++ b/internal/collector/common/dispatcher/time_wheel.go @@ -36,15 +36,13 @@ type TimeDispatch struct { commonDispatcher MetricsTaskDispatcher cyclicTasks sync.Map // map[int64]*jobtypes.Timeout for cyclic jobs tempTasks sync.Map // map[int64]*jobtypes.Timeout for one-time jobs - ctx context.Context - cancel context.CancelFunc started bool mu sync.RWMutex } // MetricsTaskDispatcher interface for metrics task dispatching type MetricsTaskDispatcher interface { - DispatchMetricsTask(job *jobtypes.Job, timeout *jobtypes.Timeout) error + DispatchMetricsTask(ctx context.Context, job *jobtypes.Job, timeout *jobtypes.Timeout) error } // HashedWheelTimer interface for time wheel operations @@ -56,13 +54,9 @@ type HashedWheelTimer interface { // NewTimeDispatch creates a new time dispatcher func NewTimeDispatch(logger logger.Logger, commonDispatcher MetricsTaskDispatcher) *TimeDispatch { - ctx, cancel := context.WithCancel(context.Background()) - td := &TimeDispatch{ logger: logger.WithName("time-dispatch"), commonDispatcher: commonDispatcher, - ctx: ctx, - cancel: cancel, started: false, } @@ -168,9 +162,6 @@ func (td *TimeDispatch) Stop() error { td.logger.Info("stopping time dispatcher") - // Cancel context - td.cancel() - // Cancel all running tasks td.cyclicTasks.Range(func(key, value interface{}) bool { if timeout, ok := value.(*jobtypes.Timeout); ok { @@ -201,7 +192,7 @@ func (td *TimeDispatch) Stop() error { } // Stats returns dispatcher statistics -func (td *TimeDispatch) Stats() map[string]interface{} { +func (td *TimeDispatch) Stats() map[string]any { cyclicCount := 0 tempCount := 0 @@ -215,7 +206,7 @@ func (td *TimeDispatch) Stats() map[string]interface{} { return true }) - return map[string]interface{}{ + return map[string]any{ "cyclicJobs": cyclicCount, "tempJobs": tempCount, "started": td.started, diff --git a/internal/collector/common/dispatcher/wheel_timer_task.go b/internal/collector/common/dispatcher/wheel_timer_task.go index ebda8f2..3eb53e3 100644 --- a/internal/collector/common/dispatcher/wheel_timer_task.go +++ b/internal/collector/common/dispatcher/wheel_timer_task.go @@ -20,6 +20,7 @@ package dispatcher import ( + "context" "fmt" "time" @@ -59,9 +60,13 @@ func (wtt *WheelTimerTask) Run(timeout *jobtypes.Timeout) error { startTime := time.Now() + // Create a context for this specific task execution + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Dispatch metrics task through common dispatcher // This is where the job gets broken down into individual metric collection tasks - err := wtt.commonDispatcher.DispatchMetricsTask(wtt.job, timeout) + err := wtt.commonDispatcher.DispatchMetricsTask(ctx, wtt.job, timeout) duration := time.Since(startTime) diff --git a/internal/collector/common/job/job_server.go b/internal/collector/common/job/job_server.go index 31eed6f..f2f3ba2 100644 --- a/internal/collector/common/job/job_server.go +++ b/internal/collector/common/job/job_server.go @@ -21,9 +21,11 @@ package job import ( "context" + "errors" "fmt" "sync" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic" "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect" "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/dispatch" "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/dispatcher" @@ -43,23 +45,23 @@ type TimeDispatcher interface { // Config represents job service configuration type Config struct { clrServer.Server + TimeDispatch TimeDispatcher } // Runner implements the service runner interface type Runner struct { Config - timeDispatch TimeDispatcher - mu sync.RWMutex - runningJobs map[int64]*jobtypes.Job - ctx context.Context - cancel context.CancelFunc + mu sync.RWMutex + runningJobs map[int64]*jobtypes.Job + ctx context.Context + cancel context.CancelFunc } // AddAsyncCollectJob adds a job to async collection scheduling func (r *Runner) AddAsyncCollectJob(job *jobtypes.Job) error { if job == nil { r.Logger.Error(nil, "job cannot be nil") - return fmt.Errorf("job cannot be nil") + return errors.New("job cannot be nil") } r.Logger.Info("adding async collect job", @@ -75,7 +77,7 @@ func (r *Runner) AddAsyncCollectJob(job *jobtypes.Job) error { r.runningJobs[job.ID] = job // Add job to time dispatcher for scheduling - if err := r.timeDispatch.AddJob(job); err != nil { + if err := r.TimeDispatch.AddJob(job); err != nil { delete(r.runningJobs, job.ID) r.Logger.Error(err, "failed to add job to time dispatcher", "jobID", job.ID) return fmt.Errorf("failed to add job to time dispatcher: %w", err) @@ -96,7 +98,7 @@ func (r *Runner) RemoveAsyncCollectJob(jobID int64) error { delete(r.runningJobs, jobID) // Remove from time dispatcher - if err := r.timeDispatch.RemoveJob(jobID); err != nil { + if err := r.TimeDispatch.RemoveJob(jobID); err != nil { r.Logger.Error(err, "failed to remove job from time dispatcher", "jobID", jobID) return fmt.Errorf("failed to remove job from time dispatcher: %w", err) } @@ -133,12 +135,14 @@ func New(srv *Config) *Runner { // Create time dispatcher timeDispatch := dispatcher.NewTimeDispatch(srv.Logger, commonDispatcher) + // Update the config with the time dispatcher + srv.TimeDispatch = timeDispatch + runner := &Runner{ - Config: *srv, - timeDispatch: timeDispatch, - runningJobs: make(map[int64]*jobtypes.Job), - ctx: ctx, - cancel: cancel, + Config: *srv, + runningJobs: make(map[int64]*jobtypes.Job), + ctx: ctx, + cancel: cancel, } return runner @@ -149,14 +153,18 @@ func (r *Runner) Start(ctx context.Context) error { r.Logger = r.Logger.WithName(r.Info().Name).WithValues("runner", r.Info().Name) r.Logger.Info("Starting job service runner") + // Initialize all collectors + r.Logger.Info("Initializing collectors...") + basic.InitializeAllCollectors(r.Logger) + // Start the time dispatcher - if r.timeDispatch != nil { - if err := r.timeDispatch.Start(ctx); err != nil { + if r.TimeDispatch != nil { + if err := r.TimeDispatch.Start(ctx); err != nil { r.Logger.Error(err, "failed to start time dispatcher") return fmt.Errorf("failed to start time dispatcher: %w", err) } } else { - return fmt.Errorf("time dispatcher is not initialized") + return errors.New("time dispatcher is not initialized") } r.Logger.Info("job service runner started successfully") @@ -164,8 +172,8 @@ func (r *Runner) Start(ctx context.Context) error { select { case <-ctx.Done(): r.Logger.Info("job service runner stopped by context") - if r.timeDispatch != nil { - if err := r.timeDispatch.Stop(); err != nil { + if r.TimeDispatch != nil { + if err := r.TimeDispatch.Stop(); err != nil { r.Logger.Error(err, "error stopping time dispatcher") } } @@ -187,8 +195,8 @@ func (r *Runner) Close() error { r.cancel() // Stop time dispatcher - if r.timeDispatch != nil { - if err := r.timeDispatch.Stop(); err != nil { + if r.TimeDispatch != nil { + if err := r.TimeDispatch.Stop(); err != nil { r.Logger.Error(err, "error stopping time dispatcher") // Continue cleanup despite error } @@ -199,8 +207,8 @@ func (r *Runner) Close() error { // Clear running jobs r.mu.Lock() + defer r.mu.Unlock() r.runningJobs = make(map[int64]*jobtypes.Job) - r.mu.Unlock() r.Logger.Info("job service runner closed") return nil --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@hertzbeat.apache.org For additional commands, e-mail: notifications-h...@hertzbeat.apache.org