This is an automated email from the ASF dual-hosted git repository.

gaoxingcun pushed a commit to branch reconfiguration_scheduling
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git

commit 64afc2575a067f288d0315e25f0a3eb9598fc21f
Author: TJxiaobao <2922035...@qq.com>
AuthorDate: Tue Sep 9 21:24:35 2025 +0800

    add:Improve the go code specification.
---
 examples/main_simulation.go                        |  1 +
 internal/cmd/server.go                             |  7 ---
 internal/collector/basic/init.go                   | 18 ++++----
 .../common/collect/dispatch/metrics_collector.go   |  7 ++-
 .../collector/common/collect/result_handler.go     | 10 ++++-
 .../common/dispatcher/common_dispatcher.go         | 21 +++------
 internal/collector/common/dispatcher/time_wheel.go | 15 ++-----
 .../common/dispatcher/wheel_timer_task.go          |  7 ++-
 internal/collector/common/job/job_server.go        | 50 +++++++++++++---------
 9 files changed, 67 insertions(+), 69 deletions(-)

diff --git a/examples/main_simulation.go b/examples/main_simulation.go
new file mode 100644
index 0000000..c7842af
--- /dev/null
+++ b/examples/main_simulation.go
@@ -0,0 +1 @@
+package examples
diff --git a/internal/cmd/server.go b/internal/cmd/server.go
index 81a8bfe..c67fef8 100644
--- a/internal/cmd/server.go
+++ b/internal/cmd/server.go
@@ -30,7 +30,6 @@ import (
        "github.com/spf13/cobra"
 
        bannerouter 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/banner"
-       "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic"
        jobserver 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/job"
        clrServer 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server"
        transportserver 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/transport"
@@ -109,12 +108,6 @@ func server(ctx context.Context, logOut io.Writer) error {
 }
 
 func startRunners(ctx context.Context, cfg *clrServer.Server) error {
-
-       // Initialize all collectors before starting runners
-       // This ensures all protocol collectors are registered and ready
-       cfg.Logger.Info("Initializing collectors...")
-       basic.InitializeAllCollectors(cfg.Logger)
-
        runners := []struct {
                runner Runner[collectortypes.Info]
        }{
diff --git a/internal/collector/basic/init.go b/internal/collector/basic/init.go
index 74c4aa6..f4a7504 100644
--- a/internal/collector/basic/init.go
+++ b/internal/collector/basic/init.go
@@ -25,16 +25,16 @@ import (
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
-// init 函数在包被导入时自动执行
-// 集中注册所有协议的工厂函数
+// init function is automatically executed when the package is imported
+// It centrally registers factory functions for all protocols
 func init() {
-       // 注册所有协议的工厂函数
-       // 新增协议时只需要在这里添加一行
+       // Register factory functions for all protocols
+       // To add a new protocol, simply add a line here
        strategy.RegisterFactory("jdbc", func(logger logger.Logger) 
strategy.Collector {
                return database.NewJDBCCollector(logger)
        })
 
-       // 未来可以在这里添加更多协议:
+       // More protocols can be added here in the future:
        // strategy.RegisterFactory("http", func(logger logger.Logger) 
strategy.Collector {
        //     return http.NewHTTPCollector(logger)
        // })
@@ -43,13 +43,13 @@ func init() {
        // })
 }
 
-// InitializeAllCollectors 初始化所有已注册的采集器
-// 此时init()函数已经注册了所有工厂函数
-// 这个函数创建实际的采集器实例
+// InitializeAllCollectors initializes all registered collectors
+// At this point, the init() function has already registered all factory 
functions
+// This function creates the actual collector instances
 func InitializeAllCollectors(logger logger.Logger) {
        logger.Info("initializing all collectors")
 
-       // 使用已注册的工厂函数创建采集器实例
+       // Create collector instances using registered factory functions
        strategy.InitializeCollectors(logger)
 
        logger.Info("all collectors initialized successfully")
diff --git a/internal/collector/common/collect/dispatch/metrics_collector.go 
b/internal/collector/common/collect/dispatch/metrics_collector.go
index b569858..85e7580 100644
--- a/internal/collector/common/collect/dispatch/metrics_collector.go
+++ b/internal/collector/common/collect/dispatch/metrics_collector.go
@@ -21,8 +21,11 @@ package dispatch
 
 import (
        "fmt"
+       "net/http"
        "time"
 
+       // Import basic package with blank identifier to trigger its init() 
function
+       // This ensures all collector factories are registered automatically
        _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic"
        
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
        jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
@@ -64,7 +67,7 @@ func (mc *MetricsCollector) CollectMetrics(metrics 
*jobtypes.Metrics, job *jobty
                                "protocol", metrics.Protocol,
                                "metricsName", metrics.Name)
 
-                       result := mc.createErrorResponse(metrics, job, 500, 
fmt.Sprintf("Collector not found: %v", err))
+                       result := mc.createErrorResponse(metrics, job, 
http.StatusInternalServerError, fmt.Sprintf("Collector not found: %v", err))
                        resultChan <- result
                        return
                }
@@ -98,7 +101,7 @@ func (mc *MetricsCollector) CollectMetrics(metrics 
*jobtypes.Metrics, job *jobty
 
                duration := time.Since(startTime)
 
-               if result != nil && result.Code == 200 {
+               if result != nil && result.Code == http.StatusOK {
                        mc.logger.Info("metrics collection completed 
successfully",
                                "jobID", job.ID,
                                "metricsName", metrics.Name,
diff --git a/internal/collector/common/collect/result_handler.go 
b/internal/collector/common/collect/result_handler.go
index 701ca63..7f5a304 100644
--- a/internal/collector/common/collect/result_handler.go
+++ b/internal/collector/common/collect/result_handler.go
@@ -21,6 +21,7 @@ package collect
 
 import (
        "fmt"
+       "net/http"
 
        jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
@@ -32,8 +33,13 @@ type ResultHandlerImpl struct {
        // TODO: Add data queue or storage interface when needed
 }
 
+// ResultHandler interface for handling collection results
+type ResultHandler interface {
+       HandleCollectData(data *jobtypes.CollectRepMetricsData, job 
*jobtypes.Job) error
+}
+
 // NewResultHandler creates a new result handler
-func NewResultHandler(logger logger.Logger) *ResultHandlerImpl {
+func NewResultHandler(logger logger.Logger) ResultHandler {
        return &ResultHandlerImpl{
                logger: logger.WithName("result-handler"),
        }
@@ -61,7 +67,7 @@ func (rh *ResultHandlerImpl) HandleCollectData(data 
*jobtypes.CollectRepMetricsD
        // 4. Triggering alerts based on thresholds
        // 5. Updating monitoring status
 
-       if data.Code == 200 {
+       if data.Code == http.StatusOK {
                rh.logger.Info("successfully processed collect data",
                        "jobID", job.ID,
                        "metricsName", data.Metrics)
diff --git a/internal/collector/common/dispatcher/common_dispatcher.go 
b/internal/collector/common/dispatcher/common_dispatcher.go
index d7a58c0..d6e10f4 100644
--- a/internal/collector/common/dispatcher/common_dispatcher.go
+++ b/internal/collector/common/dispatcher/common_dispatcher.go
@@ -25,6 +25,7 @@ import (
        "sync"
        "time"
 
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect"
        jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
@@ -34,9 +35,7 @@ import (
 type CommonDispatcherImpl struct {
        logger           logger.Logger
        metricsCollector MetricsCollector
-       resultHandler    ResultHandler
-       ctx              context.Context
-       cancel           context.CancelFunc
+       resultHandler    collect.ResultHandler
        mu               sync.RWMutex
 }
 
@@ -45,26 +44,19 @@ type MetricsCollector interface {
        CollectMetrics(metrics *jobtypes.Metrics, job *jobtypes.Job, timeout 
*jobtypes.Timeout) chan *jobtypes.CollectRepMetricsData
 }
 
-// ResultHandler interface for handling collection results
-type ResultHandler interface {
-       HandleCollectData(data *jobtypes.CollectRepMetricsData, job 
*jobtypes.Job) error
-}
+// ResultHandler interface is now defined in collect/result_handler.go
 
 // NewCommonDispatcher creates a new common dispatcher
-func NewCommonDispatcher(logger logger.Logger, metricsCollector 
MetricsCollector, resultHandler ResultHandler) *CommonDispatcherImpl {
-       ctx, cancel := context.WithCancel(context.Background())
-
+func NewCommonDispatcher(logger logger.Logger, metricsCollector 
MetricsCollector, resultHandler collect.ResultHandler) *CommonDispatcherImpl {
        return &CommonDispatcherImpl{
                logger:           logger.WithName("common-dispatcher"),
                metricsCollector: metricsCollector,
                resultHandler:    resultHandler,
-               ctx:              ctx,
-               cancel:           cancel,
        }
 }
 
 // DispatchMetricsTask dispatches a job by breaking it down into individual 
metrics collection tasks
-func (cd *CommonDispatcherImpl) DispatchMetricsTask(job *jobtypes.Job, timeout 
*jobtypes.Timeout) error {
+func (cd *CommonDispatcherImpl) DispatchMetricsTask(ctx context.Context, job 
*jobtypes.Job, timeout *jobtypes.Timeout) error {
        if job == nil {
                return fmt.Errorf("job cannot be nil")
        }
@@ -85,7 +77,7 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(job 
*jobtypes.Job, timeout *
        }
 
        // Create collection context with timeout
-       collectCtx, collectCancel := context.WithTimeout(cd.ctx, 
cd.getCollectionTimeout(job))
+       collectCtx, collectCancel := context.WithTimeout(ctx, 
cd.getCollectionTimeout(job))
        defer collectCancel()
 
        // Collect all metrics concurrently using channels (Go's way of 
handling concurrency)
@@ -239,6 +231,5 @@ func (cd *CommonDispatcherImpl) getCollectionTimeout(job 
*jobtypes.Job) time.Dur
 // Stop stops the common dispatcher
 func (cd *CommonDispatcherImpl) Stop() error {
        cd.logger.Info("stopping common dispatcher")
-       cd.cancel()
        return nil
 }
diff --git a/internal/collector/common/dispatcher/time_wheel.go 
b/internal/collector/common/dispatcher/time_wheel.go
index 2040ec0..41b6860 100644
--- a/internal/collector/common/dispatcher/time_wheel.go
+++ b/internal/collector/common/dispatcher/time_wheel.go
@@ -36,15 +36,13 @@ type TimeDispatch struct {
        commonDispatcher MetricsTaskDispatcher
        cyclicTasks      sync.Map // map[int64]*jobtypes.Timeout for cyclic jobs
        tempTasks        sync.Map // map[int64]*jobtypes.Timeout for one-time 
jobs
-       ctx              context.Context
-       cancel           context.CancelFunc
        started          bool
        mu               sync.RWMutex
 }
 
 // MetricsTaskDispatcher interface for metrics task dispatching
 type MetricsTaskDispatcher interface {
-       DispatchMetricsTask(job *jobtypes.Job, timeout *jobtypes.Timeout) error
+       DispatchMetricsTask(ctx context.Context, job *jobtypes.Job, timeout 
*jobtypes.Timeout) error
 }
 
 // HashedWheelTimer interface for time wheel operations
@@ -56,13 +54,9 @@ type HashedWheelTimer interface {
 
 // NewTimeDispatch creates a new time dispatcher
 func NewTimeDispatch(logger logger.Logger, commonDispatcher 
MetricsTaskDispatcher) *TimeDispatch {
-       ctx, cancel := context.WithCancel(context.Background())
-
        td := &TimeDispatch{
                logger:           logger.WithName("time-dispatch"),
                commonDispatcher: commonDispatcher,
-               ctx:              ctx,
-               cancel:           cancel,
                started:          false,
        }
 
@@ -168,9 +162,6 @@ func (td *TimeDispatch) Stop() error {
 
        td.logger.Info("stopping time dispatcher")
 
-       // Cancel context
-       td.cancel()
-
        // Cancel all running tasks
        td.cyclicTasks.Range(func(key, value interface{}) bool {
                if timeout, ok := value.(*jobtypes.Timeout); ok {
@@ -201,7 +192,7 @@ func (td *TimeDispatch) Stop() error {
 }
 
 // Stats returns dispatcher statistics
-func (td *TimeDispatch) Stats() map[string]interface{} {
+func (td *TimeDispatch) Stats() map[string]any {
        cyclicCount := 0
        tempCount := 0
 
@@ -215,7 +206,7 @@ func (td *TimeDispatch) Stats() map[string]interface{} {
                return true
        })
 
-       return map[string]interface{}{
+       return map[string]any{
                "cyclicJobs": cyclicCount,
                "tempJobs":   tempCount,
                "started":    td.started,
diff --git a/internal/collector/common/dispatcher/wheel_timer_task.go 
b/internal/collector/common/dispatcher/wheel_timer_task.go
index ebda8f2..3eb53e3 100644
--- a/internal/collector/common/dispatcher/wheel_timer_task.go
+++ b/internal/collector/common/dispatcher/wheel_timer_task.go
@@ -20,6 +20,7 @@
 package dispatcher
 
 import (
+       "context"
        "fmt"
        "time"
 
@@ -59,9 +60,13 @@ func (wtt *WheelTimerTask) Run(timeout *jobtypes.Timeout) 
error {
 
        startTime := time.Now()
 
+       // Create a context for this specific task execution
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
        // Dispatch metrics task through common dispatcher
        // This is where the job gets broken down into individual metric 
collection tasks
-       err := wtt.commonDispatcher.DispatchMetricsTask(wtt.job, timeout)
+       err := wtt.commonDispatcher.DispatchMetricsTask(ctx, wtt.job, timeout)
 
        duration := time.Since(startTime)
 
diff --git a/internal/collector/common/job/job_server.go 
b/internal/collector/common/job/job_server.go
index 31eed6f..f2f3ba2 100644
--- a/internal/collector/common/job/job_server.go
+++ b/internal/collector/common/job/job_server.go
@@ -21,9 +21,11 @@ package job
 
 import (
        "context"
+       "errors"
        "fmt"
        "sync"
 
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic"
        
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect"
        
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/dispatch"
        
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/dispatcher"
@@ -43,23 +45,23 @@ type TimeDispatcher interface {
 // Config represents job service configuration
 type Config struct {
        clrServer.Server
+       TimeDispatch TimeDispatcher
 }
 
 // Runner implements the service runner interface
 type Runner struct {
        Config
-       timeDispatch TimeDispatcher
-       mu           sync.RWMutex
-       runningJobs  map[int64]*jobtypes.Job
-       ctx          context.Context
-       cancel       context.CancelFunc
+       mu          sync.RWMutex
+       runningJobs map[int64]*jobtypes.Job
+       ctx         context.Context
+       cancel      context.CancelFunc
 }
 
 // AddAsyncCollectJob adds a job to async collection scheduling
 func (r *Runner) AddAsyncCollectJob(job *jobtypes.Job) error {
        if job == nil {
                r.Logger.Error(nil, "job cannot be nil")
-               return fmt.Errorf("job cannot be nil")
+               return errors.New("job cannot be nil")
        }
 
        r.Logger.Info("adding async collect job",
@@ -75,7 +77,7 @@ func (r *Runner) AddAsyncCollectJob(job *jobtypes.Job) error {
        r.runningJobs[job.ID] = job
 
        // Add job to time dispatcher for scheduling
-       if err := r.timeDispatch.AddJob(job); err != nil {
+       if err := r.TimeDispatch.AddJob(job); err != nil {
                delete(r.runningJobs, job.ID)
                r.Logger.Error(err, "failed to add job to time dispatcher", 
"jobID", job.ID)
                return fmt.Errorf("failed to add job to time dispatcher: %w", 
err)
@@ -96,7 +98,7 @@ func (r *Runner) RemoveAsyncCollectJob(jobID int64) error {
        delete(r.runningJobs, jobID)
 
        // Remove from time dispatcher
-       if err := r.timeDispatch.RemoveJob(jobID); err != nil {
+       if err := r.TimeDispatch.RemoveJob(jobID); err != nil {
                r.Logger.Error(err, "failed to remove job from time 
dispatcher", "jobID", jobID)
                return fmt.Errorf("failed to remove job from time dispatcher: 
%w", err)
        }
@@ -133,12 +135,14 @@ func New(srv *Config) *Runner {
        // Create time dispatcher
        timeDispatch := dispatcher.NewTimeDispatch(srv.Logger, commonDispatcher)
 
+       // Update the config with the time dispatcher
+       srv.TimeDispatch = timeDispatch
+
        runner := &Runner{
-               Config:       *srv,
-               timeDispatch: timeDispatch,
-               runningJobs:  make(map[int64]*jobtypes.Job),
-               ctx:          ctx,
-               cancel:       cancel,
+               Config:      *srv,
+               runningJobs: make(map[int64]*jobtypes.Job),
+               ctx:         ctx,
+               cancel:      cancel,
        }
 
        return runner
@@ -149,14 +153,18 @@ func (r *Runner) Start(ctx context.Context) error {
        r.Logger = r.Logger.WithName(r.Info().Name).WithValues("runner", 
r.Info().Name)
        r.Logger.Info("Starting job service runner")
 
+       // Initialize all collectors
+       r.Logger.Info("Initializing collectors...")
+       basic.InitializeAllCollectors(r.Logger)
+
        // Start the time dispatcher
-       if r.timeDispatch != nil {
-               if err := r.timeDispatch.Start(ctx); err != nil {
+       if r.TimeDispatch != nil {
+               if err := r.TimeDispatch.Start(ctx); err != nil {
                        r.Logger.Error(err, "failed to start time dispatcher")
                        return fmt.Errorf("failed to start time dispatcher: 
%w", err)
                }
        } else {
-               return fmt.Errorf("time dispatcher is not initialized")
+               return errors.New("time dispatcher is not initialized")
        }
 
        r.Logger.Info("job service runner started successfully")
@@ -164,8 +172,8 @@ func (r *Runner) Start(ctx context.Context) error {
        select {
        case <-ctx.Done():
                r.Logger.Info("job service runner stopped by context")
-               if r.timeDispatch != nil {
-                       if err := r.timeDispatch.Stop(); err != nil {
+               if r.TimeDispatch != nil {
+                       if err := r.TimeDispatch.Stop(); err != nil {
                                r.Logger.Error(err, "error stopping time 
dispatcher")
                        }
                }
@@ -187,8 +195,8 @@ func (r *Runner) Close() error {
        r.cancel()
 
        // Stop time dispatcher
-       if r.timeDispatch != nil {
-               if err := r.timeDispatch.Stop(); err != nil {
+       if r.TimeDispatch != nil {
+               if err := r.TimeDispatch.Stop(); err != nil {
                        r.Logger.Error(err, "error stopping time dispatcher")
                        // Continue cleanup despite error
                }
@@ -199,8 +207,8 @@ func (r *Runner) Close() error {
 
        // Clear running jobs
        r.mu.Lock()
+       defer r.mu.Unlock()
        r.runningJobs = make(map[int64]*jobtypes.Job)
-       r.mu.Unlock()
 
        r.Logger.Info("job service runner closed")
        return nil


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@hertzbeat.apache.org
For additional commands, e-mail: notifications-h...@hertzbeat.apache.org

Reply via email to