EvanLjp commented on a change in pull request #10:
URL: 
https://github.com/apache/skywalking-satellite/pull/10#discussion_r545701435



##########
File path: plugins/queue/mmap/queue.go
##########
@@ -0,0 +1,192 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+       "bytes"
+       "context"
+       "fmt"
+       "os"
+       "sync"
+
+       "github.com/grandecola/mmap"
+
+       "github.com/apache/skywalking-satellite/internal/pkg/event"
+       "github.com/apache/skywalking-satellite/internal/pkg/log"
+       "github.com/apache/skywalking-satellite/plugins/queue/api"
+       "github.com/apache/skywalking-satellite/plugins/queue/mmap/meta"
+)
+
+// Queue is a memory mapped queue to store the input data.
+type Queue struct {
+       sync.Mutex
+       // config
+       SegmentSize           int    `mapstructure:"segment_size"`            
// The size of each segment. The unit is byte.
+       MaxInMemSegments      int    `mapstructure:"max_in_mem_segments"`     
// The max num of segments in memory.
+       QueueCapacitySegments int    `mapstructure:"queue_capacity_segments"` 
// The capacity of Queue = segment_size * queue_capacity_segments.
+       FlushPeriod           int    `mapstructure:"flush_period"`            
// The period flush time. The unit is ms.
+       FlushCeilingNum       int    `mapstructure:"flush_ceiling_num"`       
// The max number in one flush time.
+       MaxEventSize          int    `mapstructure:"max_event_size"`          
// The max size of the input event.
+       QueueDir              string `mapstructure:"queue_dir"`               
// Contains all files in the queue.
+
+       // running components
+       meta                   *meta.Metadata // The metadata file.
+       segments               []*mmap.File   // The data files.
+       mmapCount              int            // The number of the memory 
mapped files.
+       unflushedNum           int            // The unflushed number.
+       flushChannel           chan struct{}  // The flushChannel channel would 
receive a signal when the unflushedNum reach the flush_ceiling_num.
+       insufficientMemChannel chan struct{}  // Notify when memory is 
insufficient
+       sufficientMemChannel   chan struct{}  // Notify when memory is 
sufficient
+
+       // control components
+       ctx        context.Context    // Parent ctx
+       cancel     context.CancelFunc // Parent ctx cancel function
+       showDownWg sync.WaitGroup     // The shutdown wait group.
+
+       bufPool *sync.Pool
+
+       encoder *Encoder
+       decoder *Decoder
+}
+
+func (q *Queue) Name() string {
+       return "mmap-queue"
+}
+
+func (q *Queue) Description() string {
+       return "this is a mmap queue"
+}
+
+func (q *Queue) DefaultConfig() string {
+       return `
+# The size of each segment. Default value is 128K. The unit is Byte.
+segment_size: 131072
+# The max num of segments in memory. Default value is 10.
+max_in_mem_segments: 10
+# The capacity of Queue = segment_size * queue_capacity_segments.
+queue_capacity_segments: 4000
+# The period flush time. The unit is ms. Default value is 1 second.
+flush_period: 1000
+# The max number in one flush time.  Default value is 10000.
+flush_ceiling_num: 10000
+# Contains all files in the queue.
+queue_dir: satellite-mmap-queue
+# The max size of the input event. Default value is 20k.
+max_event_size: 20480
+`
+}
+
+func (q *Queue) Initialize() error {
+       q.encoder = NewEncoder()
+       q.decoder = NewDecoder()
+
+       q.bufPool = &sync.Pool{New: func() interface{} {
+               return new(bytes.Buffer)
+       }}
+       // the size of each segment file should be a multiple of the page size.
+       pageSize := os.Getpagesize()
+       if q.SegmentSize%pageSize != 0 {
+               q.SegmentSize -= q.SegmentSize % pageSize
+       }
+       if q.SegmentSize/pageSize == 0 {
+               q.SegmentSize = 131072
+       }
+       // the minimum MaxInMemSegments value should be 4.
+       if q.MaxInMemSegments < 4 {
+               q.MaxInMemSegments = 4
+       }
+       // load metadata and override the reading or writing offset by the 
committed or watermark offset.
+       md, err := meta.NewMetaData(q.QueueDir, q.QueueCapacitySegments)
+       if err != nil {
+               return fmt.Errorf("error in creating the metadata: %v", err)
+       }
+       q.meta = md
+       cmID, cmOffset := md.GetCommittedOffset()
+       wmID, wmOffset := md.GetWatermarkOffset()
+       md.PutWritingOffset(wmID, wmOffset)
+       md.PutReadingOffset(cmID, cmOffset)
+       // keep the reading or writing segments in the memory.
+       q.segments = make([]*mmap.File, q.QueueCapacitySegments)
+       if _, err := q.GetSegment(cmID); err != nil {
+               return err
+       }
+       if _, err := q.GetSegment(wmID); err != nil {
+               return err
+       }
+       // init components
+       q.insufficientMemChannel = make(chan struct{})
+       q.sufficientMemChannel = make(chan struct{})
+       q.flushChannel = make(chan struct{})
+       q.ctx, q.cancel = context.WithCancel(context.Background())
+       // async supported processes.
+       q.showDownWg.Add(2)
+       go q.segmentSwapper()
+       go q.flush()
+       return nil
+}
+
+func (q *Queue) Push(e *event.Event) error {

Review comment:
       I understand that the tests have been fully covered




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to