surlymo commented on a change in pull request #10:
URL: 
https://github.com/apache/skywalking-satellite/pull/10#discussion_r548939173



##########
File path: plugins/queue/mmap/queue_opreation.go
##########
@@ -0,0 +1,167 @@
+// MIT License
+//
+// Copyright (c) 2018 Aman Mangal
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to 
deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in 
all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 
THE
+// SOFTWARE.
+
+package mmap
+
+import (
+       "fmt"
+)
+
+// Because the design of the mmap-queue in Satellite references the design of 
the
+// bigqueue(https://github.com/grandecola/bigqueue), the queue operation file 
retains
+// the original author license.
+//
+// The reason why we references the source codes of bigqueue rather than using 
the lib
+// is the file queue in Satellite is like following.
+// 1. Only one consumer and publisher in the Satellite queue.
+// 2. Reusing files strategy is required to reduce the creation times in the 
Satellite queue.
+// 3. More complex OFFSET design is needed to ensure the final stability of 
data.
+
+const uInt64Size = 8
+
+// push writes the data into the file system. It first writes the length of 
the data,
+// then the data itself. It means the whole data may not exist in the one 
segments.
+func (q *Queue) push(bytes []byte) error {
+       if q.isFull() {
+               return fmt.Errorf("cannot push data when the queue is full")
+       }
+       id, offset := q.meta.GetWritingOffset()
+       id, offset, err := q.writeLength(len(bytes), id, offset)
+       if err != nil {
+               return err
+       }
+       id, offset, err = q.writeBytes(bytes, id, offset)
+       if err != nil {
+               return err
+       }
+       q.meta.PutWritingOffset(id, offset)
+       q.unflushedNum++
+       if q.unflushedNum == q.FlushCeilingNum {
+               q.flushChannel <- struct{}{}
+               q.unflushedNum = 0
+       }
+       return nil
+}
+
+// pop reads the data from the file system. It first reads the length of the 
data,
+// then the data itself. It means the whole data may not exist in the one 
segments.
+func (q *Queue) pop() (data []byte, rid, roffset int64, err error) {
+       if q.isEmpty() {
+               return nil, 0, 0, fmt.Errorf("cannot read data when the queue 
is empty")
+       }
+       id, offset := q.meta.GetReadingOffset()
+       id, offset, length, err := q.readLength(id, offset)
+       if err != nil {
+               return nil, 0, 0, err
+       }
+       bytes, id, offset, err := q.readBytes(id, offset, length)
+       if err != nil {
+               return nil, 0, 0, err
+       }
+       q.meta.PutReadingOffset(id, offset)
+       return bytes, id, offset, nil
+}
+
+// readBytes reads bytes into the memory mapped file.
+func (q *Queue) readBytes(id, offset int64, length int) (data []byte, newID, 
newOffset int64, err error) {
+       counter := 0
+       res := make([]byte, length)
+       for {
+               segment, err := q.GetSegment(id)
+               if err != nil {
+                       return nil, 0, 0, err
+               }
+               readBytes, err := segment.ReadAt(res[counter:], offset)
+               if err != nil {
+                       return nil, 0, 0, err
+               }
+               counter += readBytes
+               offset += int64(readBytes)
+               if offset == int64(q.SegmentSize) {
+                       id, offset = id+1, 0
+               }
+               if counter == length {
+                       break
+               }
+       }
+       return res, id, offset, nil
+}
+
+// readLength reads the data length with 8 Bits spaces.
+func (q *Queue) readLength(id, offset int64) (newID, newOffset int64, length 
int, err error) {
+       if offset+uInt64Size > int64(q.SegmentSize) {
+               id, offset = id+1, 0
+       }
+       segment, err := q.GetSegment(id)
+       if err != nil {
+               return 0, 0, 0, err
+       }
+       num := segment.ReadUint64At(offset)
+       offset += uInt64Size
+       if offset == int64(q.SegmentSize) {
+               id, offset = id+1, 0
+       }
+       return id, offset, int(num), nil
+}
+
+// writeLength write the data length with 8 Bits spaces.
+func (q *Queue) writeLength(length int, id, offset int64) (newID, newOffset 
int64, err error) {
+       if offset+uInt64Size > int64(q.SegmentSize) {

Review comment:
       remainSegmentSize?

##########
File path: plugins/queue/mmap/segment_operation.go
##########
@@ -0,0 +1,143 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+       "context"
+       "fmt"
+       "path"
+       "strconv"
+       "sync/atomic"
+
+       "github.com/grandecola/mmap"
+
+       "github.com/apache/skywalking-satellite/internal/pkg/log"
+       "github.com/apache/skywalking-satellite/plugins/queue/mmap/segment"
+)
+
+// GetSegment returns a memory mapped file at the segmentID position.
+func (q *Queue) GetSegment(segmentID int64) (*mmap.File, error) {
+       index := q.GetIndex(segmentID)
+       if q.mmapCount >= q.MaxInMemSegments {
+               q.insufficientMemChannel <- struct{}{}
+               <-q.sufficientMemChannel
+       }
+       if err := q.mapSegment(segmentID); err != nil {
+               return nil, err
+       }
+       if q.segments[index] != nil {
+               return q.segments[index], nil
+       }
+       return nil, fmt.Errorf("cannot get a memory mapped file at %d segment", 
segmentID)
+}
+
+// mapSegment load the segment file reference to the segments.
+func (q *Queue) mapSegment(segmentID int64) error {
+       index := q.GetIndex(segmentID)
+       if q.segments[index] != nil {
+               return nil
+       }
+       filePath := path.Join(q.QueueDir, 
strconv.Itoa(index)+segment.FileSuffix)

Review comment:
       file capacity threadhold management mechanism?

##########
File path: plugins/queue/mmap/queue_opreation.go
##########
@@ -0,0 +1,167 @@
+// MIT License
+//
+// Copyright (c) 2018 Aman Mangal
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to 
deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in 
all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 
THE
+// SOFTWARE.
+
+package mmap
+
+import (
+       "fmt"
+)
+
+// Because the design of the mmap-queue in Satellite references the design of 
the
+// bigqueue(https://github.com/grandecola/bigqueue), the queue operation file 
retains
+// the original author license.
+//
+// The reason why we references the source codes of bigqueue rather than using 
the lib
+// is the file queue in Satellite is like following.
+// 1. Only one consumer and publisher in the Satellite queue.
+// 2. Reusing files strategy is required to reduce the creation times in the 
Satellite queue.
+// 3. More complex OFFSET design is needed to ensure the final stability of 
data.
+
+const uInt64Size = 8
+
+// push writes the data into the file system. It first writes the length of 
the data,
+// then the data itself. It means the whole data may not exist in the one 
segments.
+func (q *Queue) push(bytes []byte) error {
+       if q.isFull() {
+               return fmt.Errorf("cannot push data when the queue is full")
+       }
+       id, offset := q.meta.GetWritingOffset()
+       id, offset, err := q.writeLength(len(bytes), id, offset)
+       if err != nil {
+               return err
+       }
+       id, offset, err = q.writeBytes(bytes, id, offset)
+       if err != nil {
+               return err
+       }
+       q.meta.PutWritingOffset(id, offset)
+       q.unflushedNum++
+       if q.unflushedNum == q.FlushCeilingNum {
+               q.flushChannel <- struct{}{}
+               q.unflushedNum = 0
+       }
+       return nil
+}
+
+// pop reads the data from the file system. It first reads the length of the 
data,
+// then the data itself. It means the whole data may not exist in the one 
segments.
+func (q *Queue) pop() (data []byte, rid, roffset int64, err error) {
+       if q.isEmpty() {
+               return nil, 0, 0, fmt.Errorf("cannot read data when the queue 
is empty")
+       }
+       id, offset := q.meta.GetReadingOffset()
+       id, offset, length, err := q.readLength(id, offset)
+       if err != nil {
+               return nil, 0, 0, err
+       }
+       bytes, id, offset, err := q.readBytes(id, offset, length)
+       if err != nil {
+               return nil, 0, 0, err
+       }
+       q.meta.PutReadingOffset(id, offset)
+       return bytes, id, offset, nil
+}
+
+// readBytes reads bytes into the memory mapped file.
+func (q *Queue) readBytes(id, offset int64, length int) (data []byte, newID, 
newOffset int64, err error) {
+       counter := 0
+       res := make([]byte, length)
+       for {
+               segment, err := q.GetSegment(id)
+               if err != nil {
+                       return nil, 0, 0, err
+               }
+               readBytes, err := segment.ReadAt(res[counter:], offset)
+               if err != nil {
+                       return nil, 0, 0, err
+               }
+               counter += readBytes
+               offset += int64(readBytes)
+               if offset == int64(q.SegmentSize) {
+                       id, offset = id+1, 0
+               }
+               if counter == length {
+                       break
+               }
+       }
+       return res, id, offset, nil
+}
+
+// readLength reads the data length with 8 Bits spaces.
+func (q *Queue) readLength(id, offset int64) (newID, newOffset int64, length 
int, err error) {
+       if offset+uInt64Size > int64(q.SegmentSize) {
+               id, offset = id+1, 0
+       }
+       segment, err := q.GetSegment(id)
+       if err != nil {
+               return 0, 0, 0, err
+       }
+       num := segment.ReadUint64At(offset)
+       offset += uInt64Size
+       if offset == int64(q.SegmentSize) {
+               id, offset = id+1, 0
+       }
+       return id, offset, int(num), nil
+}
+
+// writeLength write the data length with 8 Bits spaces.
+func (q *Queue) writeLength(length int, id, offset int64) (newID, newOffset 
int64, err error) {

Review comment:
       how to deal with while the single request bytes is greater than a 
segment size or remain segment ? show the code

##########
File path: plugins/queue/mmap/segment_operation.go
##########
@@ -0,0 +1,143 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+       "context"
+       "fmt"
+       "path"
+       "strconv"
+       "sync/atomic"
+
+       "github.com/grandecola/mmap"
+
+       "github.com/apache/skywalking-satellite/internal/pkg/log"
+       "github.com/apache/skywalking-satellite/plugins/queue/mmap/segment"
+)
+
+// GetSegment returns a memory mapped file at the segmentID position.
+func (q *Queue) GetSegment(segmentID int64) (*mmap.File, error) {
+       index := q.GetIndex(segmentID)
+       if q.mmapCount >= q.MaxInMemSegments {
+               q.insufficientMemChannel <- struct{}{}
+               <-q.sufficientMemChannel
+       }
+       if err := q.mapSegment(segmentID); err != nil {

Review comment:
       why mapsegment?

##########
File path: plugins/queue/mmap/segment_operation.go
##########
@@ -0,0 +1,143 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+       "context"
+       "fmt"
+       "path"
+       "strconv"
+       "sync/atomic"
+
+       "github.com/grandecola/mmap"
+
+       "github.com/apache/skywalking-satellite/internal/pkg/log"
+       "github.com/apache/skywalking-satellite/plugins/queue/mmap/segment"
+)
+
+// GetSegment returns a memory mapped file at the segmentID position.
+func (q *Queue) GetSegment(segmentID int64) (*mmap.File, error) {
+       index := q.GetIndex(segmentID)
+       if q.mmapCount >= q.MaxInMemSegments {

Review comment:
       why execute it here? why not just using timer to trigger

##########
File path: plugins/queue/mmap/queue_opreation.go
##########
@@ -0,0 +1,167 @@
+// MIT License
+//
+// Copyright (c) 2018 Aman Mangal
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to 
deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in 
all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 
THE
+// SOFTWARE.
+
+package mmap
+
+import (
+       "fmt"
+)
+
+// Because the design of the mmap-queue in Satellite references the design of 
the
+// bigqueue(https://github.com/grandecola/bigqueue), the queue operation file 
retains
+// the original author license.
+//
+// The reason why we references the source codes of bigqueue rather than using 
the lib
+// is the file queue in Satellite is like following.
+// 1. Only one consumer and publisher in the Satellite queue.
+// 2. Reusing files strategy is required to reduce the creation times in the 
Satellite queue.
+// 3. More complex OFFSET design is needed to ensure the final stability of 
data.
+
+const uInt64Size = 8
+
+// push writes the data into the file system. It first writes the length of 
the data,
+// then the data itself. It means the whole data may not exist in the one 
segments.
+func (q *Queue) push(bytes []byte) error {
+       if q.isFull() {
+               return fmt.Errorf("cannot push data when the queue is full")
+       }
+       id, offset := q.meta.GetWritingOffset()
+       id, offset, err := q.writeLength(len(bytes), id, offset)
+       if err != nil {
+               return err
+       }
+       id, offset, err = q.writeBytes(bytes, id, offset)
+       if err != nil {
+               return err
+       }
+       q.meta.PutWritingOffset(id, offset)
+       q.unflushedNum++
+       if q.unflushedNum == q.FlushCeilingNum {
+               q.flushChannel <- struct{}{}
+               q.unflushedNum = 0
+       }
+       return nil
+}
+
+// pop reads the data from the file system. It first reads the length of the 
data,
+// then the data itself. It means the whole data may not exist in the one 
segments.
+func (q *Queue) pop() (data []byte, rid, roffset int64, err error) {
+       if q.isEmpty() {
+               return nil, 0, 0, fmt.Errorf("cannot read data when the queue 
is empty")
+       }
+       id, offset := q.meta.GetReadingOffset()
+       id, offset, length, err := q.readLength(id, offset)
+       if err != nil {
+               return nil, 0, 0, err
+       }
+       bytes, id, offset, err := q.readBytes(id, offset, length)
+       if err != nil {
+               return nil, 0, 0, err
+       }
+       q.meta.PutReadingOffset(id, offset)
+       return bytes, id, offset, nil
+}
+
+// readBytes reads bytes into the memory mapped file.
+func (q *Queue) readBytes(id, offset int64, length int) (data []byte, newID, 
newOffset int64, err error) {
+       counter := 0
+       res := make([]byte, length)
+       for {
+               segment, err := q.GetSegment(id)
+               if err != nil {
+                       return nil, 0, 0, err
+               }
+               readBytes, err := segment.ReadAt(res[counter:], offset)
+               if err != nil {
+                       return nil, 0, 0, err
+               }
+               counter += readBytes
+               offset += int64(readBytes)
+               if offset == int64(q.SegmentSize) {
+                       id, offset = id+1, 0
+               }
+               if counter == length {
+                       break
+               }
+       }
+       return res, id, offset, nil
+}
+
+// readLength reads the data length with 8 Bits spaces.
+func (q *Queue) readLength(id, offset int64) (newID, newOffset int64, length 
int, err error) {
+       if offset+uInt64Size > int64(q.SegmentSize) {
+               id, offset = id+1, 0
+       }
+       segment, err := q.GetSegment(id)
+       if err != nil {
+               return 0, 0, 0, err
+       }
+       num := segment.ReadUint64At(offset)
+       offset += uInt64Size
+       if offset == int64(q.SegmentSize) {
+               id, offset = id+1, 0
+       }
+       return id, offset, int(num), nil
+}
+
+// writeLength write the data length with 8 Bits spaces.
+func (q *Queue) writeLength(length int, id, offset int64) (newID, newOffset 
int64, err error) {
+       if offset+uInt64Size > int64(q.SegmentSize) {

Review comment:
       change the name if yes

##########
File path: plugins/queue/mmap/segment_operation.go
##########
@@ -0,0 +1,143 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+       "context"
+       "fmt"
+       "path"
+       "strconv"
+       "sync/atomic"
+
+       "github.com/grandecola/mmap"
+
+       "github.com/apache/skywalking-satellite/internal/pkg/log"
+       "github.com/apache/skywalking-satellite/plugins/queue/mmap/segment"
+)
+
+// GetSegment returns a memory mapped file at the segmentID position.
+func (q *Queue) GetSegment(segmentID int64) (*mmap.File, error) {

Review comment:
       is the queue support concurrency situation? any problem will occur in 
this situation?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to