NiklasMerz commented on a change in pull request #124: URL: https://github.com/apache/iotdb/pull/124#discussion_r721506115
########## File path: tsfile-go/src/tsfile/common/utils/ByteUtils.go ########## @@ -0,0 +1,368 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package utils + +import ( + "bytes" +) + +// get one bit in input byte. the offset is from low to high and start with 0 +// e.g.<br> +// data:16(00010000), if offset is 4, return 1(000 "1" 0000) if offset is 7, return 0("0" 0010000) +func getByteN(data byte, offset int) int { + offset &= 0x7 + + if (data & (1 << uint32(7-offset))) != 0 { + return 1 + } else { + return 0 + } +} + +/** + * set one bit in input byte. the offset is from low to high and start with + * index 0<br> + * e.g.<br> + * data:16(00010000), if offset is 4, value is 0, return 0({000 "0" 0000}) + * if offset is 1, value is 1, return 18({00010010}) if offset is 0, value + * is 0, return 16(no change) + * + * @param data input byte variable + * @param offset bit offset + * @param value value to set + * @return byte variable + */ +func setByteN(data byte, offset int, value int) byte { + offset &= 0x7 + + if value == 1 { + return (byte)(data | (1 << uint32(7-offset))) + } else { + return (byte)(data & ^(1 << uint32(7-offset))) + } +} + +/** + * get one bit in input integer. the offset is from low to high and start + * with 0<br> + * e.g.<br> + * data:1000(00000000 00000000 00000011 11101000), if offset is 4, return + * 0(111 "0" 1000) if offset is 9, return 1(00000 "1" 1 11101000) + * + * @param data input int variable + * @param offset bit offset + * @return 0 or 1 + */ +func getIntN(data int32, offset int) int32 { + offset &= 0x1f + + if (data & (1 << uint32(offset))) != 0 { + return 1 + } else { + return 0 + } +} + +// set one bit in input integer. the offset is from low to high and start with index 0 +// e.g.<br> +// data:1000({00000000 00000000 00000011 11101000}), +// if offset is 4, value is 1, return 1016({00000000 00000000 00000011 111 "1" 1000}) +// if offset is 9, value is 0 return 488({00000000 00000000 000000 "0" 1 11101000}) +// if offset is 0, value is 0 return 1000(no change) +func setIntN(data int32, offset int, value int) int32 { + offset &= 0x1f + + if value == 1 { + return (data | (1 << uint32(offset))) + } else { + return (data & ^(1 << uint32(offset))) + } +} + +/** + * get one bit in input long. the offset is from low to high and start with + * 0<br> + * + * @param data input long variable + * @param offset bit offset + * @return 0/1 + */ +func getLongN(data int64, offset int) int32 { + offset &= 0x3f + + if (data & (int64(1) << uint32(offset))) != 0 { + return 1 + } else { + return 0 + } +} + +// set one bit in input long. the offset is from low to high and start with index 0 +func setLongN(data int64, offset int, value int) int64 { + offset &= 0x3f + + if value == 1 { + return (data | (1 << uint32(offset))) + } else { + return (data & ^(1 << uint32(offset))) + } +} + +// given a byte array, read width bits from specified position bits and convert it to an integer +func BytesToInt(data []byte, pos int, width int) int32 { + var value int32 = 0 + + offset := pos + width - 1 + for i := 0; i < width; i++ { + index := offset - i + //value = setIntN(value, i, getByteN(data[index/8], index)) Review comment: old code? ########## File path: tsfile-go/src/tsfile/TsFileSequenceReader.go ########## @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + Review comment: This file has no `test` in the name. ########## File path: tsfile-go/src/tsfile/common/conf/FileConfig.go ########## @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package conf + +import ( + "bufio" + "log" + "os" + "strconv" + "strings" +) + +const ( + CONFIG_FILE_NAME string = "tsfile-format.properties" + + MAGIC_STRING string = "TsFilev0.8.0" + + // Default bit width of RLE encoding is 8 + RLE_MIN_REPEATED_NUM int = 8 + RLE_MAX_REPEATED_NUM int = 0x7FFF + RLE_MAX_BIT_PACKED_NUM int = 63 + + // Gorilla encoding configuration + FLOAT_LENGTH int = 32 + FLAOT_LEADING_ZERO_LENGTH int = 5 + FLOAT_VALUE_LENGTH int = 6 + DOUBLE_LENGTH int = 64 + DOUBLE_LEADING_ZERO_LENGTH int = 6 + DOUBLE_VALUE_LENGTH int = 7 +) + +// Memory size threshold for flushing to disk or HDFS, default value is 128MB +var GroupSizeInByte int = 128 * 1024 * 1024 + +// The memory size for each series writer to pack page, default value is 64KB +var PageSizeInByte int = 64 * 1024 + +// The maximum number of data points in a page, defalut value is 1024 * 1024 +var MaxNumberOfPointsInPage int = 1024 * 1024 + +// Data type for input timestamp, TsFile supports INT32 or INT64 +var TimeSeriesDataType string = "INT64" + +// Max length limitation of input string +var MaxStringLength int = 128 + +// Floating-point precision +var FloatPrecision int = 2 + +// Encoder of time series, TsFile supports TS_2DIFF, PLAIN and RLE(run-length encoding) +var TimeSeriesEncoder string = "TS_2DIFF" + +// Encoder of value series. default value is PLAIN. +var ValueEncoder string = "PLAIN" + +var Compressor string = "UNCOMPRESSED" + +// Default block size of two-diff. delta encoding is 128 +var DeltaBlockSize = 128 + +// Current version is 3 +var CurrentVersion = 3 + +/** +* String encoder with UTF-8 encodes a character to at most 4 bytes. + */ +var BYTE_SIZE_PER_CHAR int = 4 + +func init() { + loadProperties() +} + +func loadProperties() { + file, err := os.Open(CONFIG_FILE_NAME) + defer file.Close() + if err != nil { + log.Println("Warn:", err) + return + } + + scanner := bufio.NewScanner(file) + for scanner.Scan() { + if k, v := loadItem(scanner.Text()); v != "" { + switch { + case k == "group_size_in_byte": + GroupSizeInByte, _ = strconv.Atoi(v) + case k == "page_size_in_byte": + PageSizeInByte, _ = strconv.Atoi(v) + case k == "max_number_of_points_in_page": + MaxNumberOfPointsInPage, _ = strconv.Atoi(v) + case k == "time_series_data_type": + TimeSeriesDataType = v + case k == "max_string_length": + MaxStringLength, _ = strconv.Atoi(v) + case k == "float_precision": + FloatPrecision, _ = strconv.Atoi(v) + case k == "time_series_encoder": + TimeSeriesEncoder = v + case k == "value_encoder": + ValueEncoder = v + case k == "compressor": + Compressor = v + } + } + } + + if err := scanner.Err(); err != nil { + log.Fatal(err) + } +} + +var is_comment bool = false Review comment: Can this variable be in `loadItem`? ########## File path: tsfile-go/src/tsfile/common/utils/ByteUtils.go ########## @@ -0,0 +1,368 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package utils + +import ( + "bytes" +) + +// get one bit in input byte. the offset is from low to high and start with 0 +// e.g.<br> +// data:16(00010000), if offset is 4, return 1(000 "1" 0000) if offset is 7, return 0("0" 0010000) +func getByteN(data byte, offset int) int { + offset &= 0x7 + + if (data & (1 << uint32(7-offset))) != 0 { + return 1 + } else { + return 0 + } +} + +/** + * set one bit in input byte. the offset is from low to high and start with + * index 0<br> + * e.g.<br> + * data:16(00010000), if offset is 4, value is 0, return 0({000 "0" 0000}) + * if offset is 1, value is 1, return 18({00010010}) if offset is 0, value + * is 0, return 16(no change) + * + * @param data input byte variable + * @param offset bit offset + * @param value value to set + * @return byte variable + */ +func setByteN(data byte, offset int, value int) byte { + offset &= 0x7 + + if value == 1 { + return (byte)(data | (1 << uint32(7-offset))) + } else { + return (byte)(data & ^(1 << uint32(7-offset))) + } +} + +/** + * get one bit in input integer. the offset is from low to high and start + * with 0<br> + * e.g.<br> + * data:1000(00000000 00000000 00000011 11101000), if offset is 4, return + * 0(111 "0" 1000) if offset is 9, return 1(00000 "1" 1 11101000) + * + * @param data input int variable + * @param offset bit offset + * @return 0 or 1 + */ +func getIntN(data int32, offset int) int32 { + offset &= 0x1f + + if (data & (1 << uint32(offset))) != 0 { + return 1 + } else { + return 0 + } +} + +// set one bit in input integer. the offset is from low to high and start with index 0 +// e.g.<br> +// data:1000({00000000 00000000 00000011 11101000}), +// if offset is 4, value is 1, return 1016({00000000 00000000 00000011 111 "1" 1000}) +// if offset is 9, value is 0 return 488({00000000 00000000 000000 "0" 1 11101000}) +// if offset is 0, value is 0 return 1000(no change) +func setIntN(data int32, offset int, value int) int32 { + offset &= 0x1f + + if value == 1 { + return (data | (1 << uint32(offset))) + } else { + return (data & ^(1 << uint32(offset))) + } +} + +/** + * get one bit in input long. the offset is from low to high and start with + * 0<br> + * + * @param data input long variable + * @param offset bit offset + * @return 0/1 + */ +func getLongN(data int64, offset int) int32 { + offset &= 0x3f + + if (data & (int64(1) << uint32(offset))) != 0 { + return 1 + } else { + return 0 + } +} + +// set one bit in input long. the offset is from low to high and start with index 0 +func setLongN(data int64, offset int, value int) int64 { + offset &= 0x3f + + if value == 1 { + return (data | (1 << uint32(offset))) + } else { + return (data & ^(1 << uint32(offset))) + } +} + +// given a byte array, read width bits from specified position bits and convert it to an integer +func BytesToInt(data []byte, pos int, width int) int32 { + var value int32 = 0 + + offset := pos + width - 1 + for i := 0; i < width; i++ { + index := offset - i + //value = setIntN(value, i, getByteN(data[index/8], index)) + + if (data[index/8] & (1 << uint32(7-index&7))) != 0 { + value = (value | (1 << uint32(i&0x1f))) + } else { + value = (value & ^(1 << uint32(i&0x1f))) + } + } + + return value +} + +// given a byte array, read width bits from specified pos bits and convert it to an long +func BytesToLong(data []byte, pos int, width int) int64 { + var value int64 = 0 + + offset := pos + width - 1 + for i := 0; i < width; i++ { + index := offset - i + //value = setLongN(value, i, getByteN(data[index/8], index)) + + if (data[index/8] & (1 << uint32(7-index&7))) != 0 { + value = (value | (1 << uint32(i&0x3f))) + } else { + value = (value & ^(1 << uint32(i&0x3f))) + } + } + + return value +} + +/** + * convert an integer to a byte array which length is width, then copy this + * array to the parameter result from pos + * + * @param srcNum input integer variable + * @param result byte array to convert + * @param pos start position + * @param width bit-width + */ +func IntToBytes(srcNum int32, result []byte, pos int, width int) { + offset := pos + width - 1 + + for i := 0; i < width; i++ { + temp := int32(offset-i) / 8 + result[temp] = setByteN(result[temp], offset-i, int(getIntN(srcNum, i))) + } +} + +/** + * convert an long to a byte array which length is width, then copy this + * array to the parameter result from pos + * + * @param srcNum input long variable + * @param result byte array to convert + * @param pos start position + * @param width bit-width + */ +func LongToBytes(srcNum int64, result []byte, pos int, width int) { + offset := pos + width - 1 + + for i := 0; i < width; i++ { + temp := (offset - i) / 8 + result[temp] = setByteN(result[temp], offset-i, int(getLongN(srcNum, i))) + } +} + +func NumberOfLeadingZeros(i int32) int32 { + if i == 0 { + return 32 + } + + var n int32 = 1 + if uint32(i)>>16 == 0 { + n += 16 + i <<= 16 + } + if uint32(i)>>24 == 0 { + n += 8 + i <<= 8 + } + if uint32(i)>>28 == 0 { + n += 4 + i <<= 4 + } + if uint32(i)>>30 == 0 { + n += 2 + i <<= 2 + } + n -= int32(uint32(i) >> 31) + + return n +} + +func NumberOfTrailingZeros(i int32) int32 { + if i == 0 { + return 32 + } + + var y int32 + var n int32 = 31 + y = i << 16 + if y != 0 { + n = n - 16 + i = y + } + y = i << 8 + if y != 0 { + n = n - 8 + i = y + } + y = i << 4 + if y != 0 { + n = n - 4 + i = y + } + y = i << 2 + if y != 0 { + n = n - 2 + i = y + } + + return n - int32(uint32(i<<1)>>31) +} + +func NumberOfLeadingZerosLong(i int64) int32 { + if i == 0 { + return 64 + } + + var n int32 = 1 + var x int32 = int32(uint64(i) >> 32) + + if x == 0 { + n += 32 + x = int32(i) + } + if uint32(x)>>16 == 0 { + n += 16 + x <<= 16 + } + if uint32(x)>>24 == 0 { + n += 8 + x <<= 8 + } + if uint32(x)>>28 == 0 { + n += 4 + x <<= 4 + } + if uint32(x)>>30 == 0 { + n += 2 + x <<= 2 + } + n -= int32(uint32(x) >> 31) + + return n +} + +func NumberOfTrailingZerosLong(i int64) int32 { + if i == 0 { + return 64 + } + + var x, y int32 + var n int32 = 63 + y = int32(i) + + if y != 0 { + n = n - 32 + x = y + } else { + x = (int32)(uint64(i) >> 32) + } + y = x << 16 + if y != 0 { + n = n - 16 + x = y + } + y = x << 8 + if y != 0 { + n = n - 8 + x = y + } + y = x << 4 + if y != 0 { + n = n - 4 + x = y + } + y = x << 2 + if y != 0 { + n = n - 2 + x = y + } + + return n - int32(uint32(x<<1)>>31) +} + +/** +* write a value to stream using unsigned var int format. +* for example, +* int 123456789 has its binary format 111010-1101111-0011010-0010101, +* function WriteUnsignedVarInt() will split every seven bits and write them to stream from low bit to high bit like: +* 1-0010101 1-0011010 1-1101111 0-0111010 +* 1 represents has next byte to write, 0 represents number end. + */ +func WriteUnsignedVarInt(value int32, buffer *bytes.Buffer) { + var position int32 = 1 + + for (value & 0x7FFFFF80) != 0 { + buffer.WriteByte(byte((value & 0x7F) | 0x80)) + value = int32(uint32(value) >> 7) + position++ + } + + buffer.WriteByte(byte(value & 0x7F)) +} + +func WriteIntLittleEndianPaddedOnBitWidth(value int32, out *bytes.Buffer, bitWidth int) { + paddedByteNum := (bitWidth + 7) / 8 + var offset uint8 = 0 + for { + if paddedByteNum <= 0 { + break + } + out.WriteByte(byte(value>>offset) & 0xFF) + offset += 8 + paddedByteNum-- + } +} + +func WriteLongLittleEndianPaddedOnBitWidth(value int64, out *bytes.Buffer, bitWidth int) { + //paddedByteNum := (bitWidth + 7) / 8; Review comment: old code? ########## File path: tsfile-go/src/tsfile/common/utils/DataTypeConvert.go ########## @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package utils + +import ( + "bytes" + "encoding/binary" + "math" + "tsfile/common/log" +) + +/** + * @Package Name: utils + * @Author: steven yao + * @Email: [email protected] + * @Create Date: 18-9-3 下午5:04 + * @Description: + */ + +// bool +func BoolToByte(flag bool, endianType int16) []byte { + var buffer bytes.Buffer + var err error + if endianType == 0 { // BigEdian + err = binary.Write(&buffer, binary.BigEndian, flag) + } else { // LittleEdian + err = binary.Write(&buffer, binary.LittleEndian, flag) + } + + if err != nil { + log.Error("BoolToByte error : %s", err) + return nil + } + return buffer.Bytes() +} + +// int +func Int64ToByte(num int64, endianType int16) []byte { + var buffer bytes.Buffer + var err error + if endianType == 0 { + err = binary.Write(&buffer, binary.BigEndian, num) + } else { + err = binary.Write(&buffer, binary.LittleEndian, num) + } + + if err != nil { + log.Error("Int64ToByte error : %s", err) + return nil + } + return buffer.Bytes() +} + +//func Int64ToByteLittleEndian(num int64) []byte { Review comment: old code? ########## File path: tsfile-go/src/tsfile/encoding/bitpacking/LongPacker.go ########## @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package bitpacking + +import ( + _ "log" +) + +/** + * This class is used to encode(decode) Long in Java with specified bit-width. Review comment: Java or Go? ########## File path: tsfile-go/src/tsfile/encoding/decoder/IntRleDecoder.go ########## @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package decoder + +import ( + _ "bytes" + _ "log" + _ "strconv" + "tsfile/common/conf" + "tsfile/common/constant" + "tsfile/common/utils" + "tsfile/encoding/bitpacking" +) + +// rle-bit-packing-hybrid: <length> <bitwidth> <encoded-data> +// length := length of the <bitwidth> <encoded-data> in bytes stored as 4 bytes little endian +// bitwidth := bitwidth for all encoded data in <encoded-data> +// encoded-data := <bit-packed-run> | <rle-run> +// bit-packed-run := <bit-packed-header> <lastBitPackedNum> <bit-packed-values> +// bit-packed-header := varint-encode(<bit-pack-count> << 1 | 1) +// lastBitPackedNum := the number of useful value in last bit-pack may be less than 8, so lastBitPackedNum indicates how many values are useful +// bit-packed-values := bit packed +// rle-run := <rle-header> <repeated-value> +// rle-header := varint-encode( (number of times repeated) << 1) +// repeated-value := value that is repeated, using a fixed-width of round-up-to-next-byte(bit-width) + +type IntRleDecoder struct { + endianType constant.EndianType + dataType constant.TSDataType + + reader *utils.BytesReader + packer *bitpacking.IntPacker + + packageReader *utils.BytesReader + // how many bytes for all encoded data + length int + // bit width for bit-packing and rle to decode + bitWidth int + // number of data left for reading in current buffer + currentCount int + // mode to indicate current encoding type + mode int + // number of bit-packing group in which is saved in header + bitPackingNum int + + currentValue int32 + decodedValues []int32 + + isReadingBegan bool +} + +func (d *IntRleDecoder) Init(data []byte) { + d.reader = utils.NewBytesReader(data) + d.currentCount = 0 + d.currentValue = 0 + d.isReadingBegan = false +} + +func (d *IntRleDecoder) HasNext() bool { + if d.currentCount > 0 || d.reader.Len() > 0 || d.packageReader.Len() > 0 { + return true + } + return false +} + +func (d *IntRleDecoder) Next() interface{} { + if !d.isReadingBegan { + // read length and bit width of current package before we decode number + d.length = int(d.reader.ReadUnsignedVarInt()) + + d.packageReader = utils.NewBytesReader(d.reader.ReadSlice(int(d.length))) + d.bitWidth = int(d.packageReader.Read()) + + d.packer = &bitpacking.IntPacker{BitWidth: d.bitWidth} + + d.isReadingBegan = true + } + + if d.currentCount == 0 { + d.readPackage() + } + + d.currentCount-- + + var result int32 = 0 + switch d.mode { + case RLE: + result = d.currentValue + break + case BIT_PACKED: + result = d.decodedValues[d.bitPackingNum-d.currentCount-1] + break + default: + panic("tsfile-encoding IntRleDecoder: not a valid mode") + } + + // if d.currentCount > 0 || d.packageReader.Len() <= 0 { Review comment: old code? ########## File path: tsfile-go/src/tsfile/common/utils/DataTypeConvert.go ########## @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package utils + +import ( + "bytes" + "encoding/binary" + "math" + "tsfile/common/log" +) + +/** + * @Package Name: utils + * @Author: steven yao + * @Email: [email protected] + * @Create Date: 18-9-3 下午5:04 + * @Description: + */ + +// bool +func BoolToByte(flag bool, endianType int16) []byte { + var buffer bytes.Buffer + var err error + if endianType == 0 { // BigEdian + err = binary.Write(&buffer, binary.BigEndian, flag) + } else { // LittleEdian + err = binary.Write(&buffer, binary.LittleEndian, flag) + } + + if err != nil { + log.Error("BoolToByte error : %s", err) + return nil + } + return buffer.Bytes() +} + +// int +func Int64ToByte(num int64, endianType int16) []byte { + var buffer bytes.Buffer + var err error + if endianType == 0 { + err = binary.Write(&buffer, binary.BigEndian, num) + } else { + err = binary.Write(&buffer, binary.LittleEndian, num) + } + + if err != nil { + log.Error("Int64ToByte error : %s", err) + return nil + } + return buffer.Bytes() +} + +//func Int64ToByteLittleEndian(num int64) []byte { +// var buffer bytes.Buffer +// err := binary.Write(&buffer, binary.LittleEndian, num) +// if err != nil { +// log.Error("Int64ToByte error : %s", err) +// return nil +// } +// return buffer.Bytes() +//} + +func Int32ToByte(num int32, endianType int16) []byte { + var buffer bytes.Buffer + var err error + if endianType == 0 { + err = binary.Write(&buffer, binary.BigEndian, num) + } else { + err = binary.Write(&buffer, binary.LittleEndian, num) + } + + if err != nil { + log.Error("Int32ToByte error : %s", err) + return nil + } + return buffer.Bytes() +} + +//func Int32ToByteLittleEndian(num int32) []byte { Review comment: old code? ########## File path: tsfile-go/src/tsfile/encoding/encoder/DoublePrecisionEncoder.go ########## @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package encoder + +import ( + "bytes" + "encoding/binary" + "math" + "tsfile/common/conf" + "tsfile/common/constant" + "tsfile/common/utils" +) + +type DoublePrecisionEncoder struct { + encoding constant.TSEncoding + dataType constant.TSDataType + + base *GorillaEncoder + preValue int64 +} + +func (d *DoublePrecisionEncoder) Encode(v interface{}, buffer *bytes.Buffer) { + base := d.base + if !base.flag { + // case: write first 8 byte value without any encoding + base.flag = true + d.preValue = int64(math.Float64bits(v.(float64))) + base.leadingZeroNum = utils.NumberOfLeadingZerosLong(d.preValue) + base.tailingZeroNum = utils.NumberOfTrailingZerosLong(d.preValue) + binary.Write(buffer, binary.LittleEndian, d.preValue) + //var bufferLittle []byte Review comment: old code? ########## File path: tsfile-go/src/tsfile/encoding/decoder/LongRleDecoder.go ########## @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package decoder + +import ( + _ "bytes" + _ "log" + _ "strconv" + "tsfile/common/conf" + "tsfile/common/constant" + "tsfile/common/utils" + "tsfile/encoding/bitpacking" +) + +// rle-bit-packing-hybrid: <length> <bitwidth> <encoded-data> Review comment: old code? ########## File path: tsfile-go/src/tsfile/encoding/decoder/IntRleDecoder.go ########## @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package decoder + +import ( + _ "bytes" + _ "log" + _ "strconv" + "tsfile/common/conf" + "tsfile/common/constant" + "tsfile/common/utils" + "tsfile/encoding/bitpacking" +) + +// rle-bit-packing-hybrid: <length> <bitwidth> <encoded-data> Review comment: old code? ########## File path: tsfile-go/src/tsfile/common/utils/FileReader.go ########## @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package utils + +import ( + "encoding/binary" + "io" + _ "log" + "math" + "os" + "tsfile/common/constant" +) + +// file stream reader with buffer, supports random reading +const SIZE_BUF = 1024 * 8 + +type FileReader struct { + reader *os.File + pos int64 // file position + b []byte // buffer + l int // buffer len + p int // buffer read position +} + +func NewFileReader(reader *os.File) *FileReader { + f := &FileReader{reader: reader} + f.pos = 0 + f.l = 0 + f.p = 0 + f.b = make([]byte, SIZE_BUF) + if n, err := f.reader.Read(f.b); err == nil || err == io.EOF { + f.l = n + } else { + panic(err) + } + + return f +} + +func (f *FileReader) Close() error { + return f.reader.Close() +} + +func (f *FileReader) ReadSlice(length int) []byte { + if length <= SIZE_BUF { // buffer size greater than reading size, so we get data from buffer + // buffer remaining is not enough, we needs to read data from file into buffer first + if f.l-f.p < length { + if f.l > f.p { + copy(f.b[0:], f.b[f.p:f.l]) + } + f.l -= f.p + f.p = 0 + + if n, err := f.reader.Read(f.b[f.l:]); err == nil || err == io.EOF { + f.l += n + if f.l < length { + panic("file has no enough data to read") + } + } else { + panic(err) + } + } + + result := f.b[f.p : f.p+length] + f.p += length + f.pos += int64(length) + + return result + } else { // buffer size less than reading size, so we need to read data from file, and discard buffer + // get available data from buffer first + result := make([]byte, length) + if f.l > f.p { + copy(result[0:], f.b[f.p:f.l]) + } + + remaining := f.l - f.p + if n, err := f.reader.Read(result[remaining:]); err == nil || err == io.EOF { + f.l = 0 + f.p = 0 + f.pos += int64(n + remaining) + } else { + panic(err) + } + + return result + } +} + +func (f *FileReader) ReadBool() bool { + buf := f.ReadSlice(constant.BOOLEAN_LEN) + result := (buf[0] == 1) + + return result +} + +func (f *FileReader) ReadShort() int16 { + buf := f.ReadSlice(constant.SHORT_LEN) + result := int16(binary.BigEndian.Uint16(buf)) + + return result +} + +func (f *FileReader) ReadInt() int32 { + buf := f.ReadSlice(constant.INT_LEN) + result := int32(binary.BigEndian.Uint32(buf)) //to int32, then to int('cause int==int64 on x64) + + return result +} + +func (f *FileReader) ReadLong() int64 { + buf := f.ReadSlice(constant.LONG_LEN) + result := int64(binary.BigEndian.Uint64(buf)) + + return result +} + +func (f *FileReader) ReadFloat() float32 { + buf := f.ReadSlice(constant.FLOAT_LEN) + bits := binary.BigEndian.Uint32(buf) + result := math.Float32frombits(bits) + + return result +} + +func (f *FileReader) ReadDouble() float64 { + buf := f.ReadSlice(constant.DOUBLE_LEN) + bits := binary.BigEndian.Uint64(buf) + result := math.Float64frombits(bits) + + return result +} + +func (f *FileReader) ReadString() string { + length := f.ReadInt() + buf := f.ReadSlice(int(length)) + result := string(buf) + + return result +} + +func (f *FileReader) ReadStringBinary() []byte { + length := int(f.ReadInt()) + + dst := make([]byte, length) + buf := f.ReadSlice(length) + + copy(dst, buf) + + return dst +} + +// this func does not change file pointer position and buffer +func (f *FileReader) ReadAt(length int, pos int64) []byte { + buf := make([]byte, length) + n, err := f.reader.ReadAt(buf, pos) + if err != nil && err != io.EOF && n != length { + panic(err) + } + //f.pos += int64(length) Review comment: old code? ########## File path: tsfile-go/src/tsfile/encoding/decoder/LongRleDecoder.go ########## @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package decoder + +import ( + _ "bytes" + _ "log" + _ "strconv" + "tsfile/common/conf" + "tsfile/common/constant" + "tsfile/common/utils" + "tsfile/encoding/bitpacking" +) + +// rle-bit-packing-hybrid: <length> <bitwidth> <encoded-data> +// length := length of the <bitwidth> <encoded-data> in bytes stored as 4 bytes little endian +// bitwidth := bitwidth for all encoded data in <encoded-data> +// encoded-data := <bit-packed-run> | <rle-run> +// bit-packed-run := <bit-packed-header> <lastBitPackedNum> <bit-packed-values> +// bit-packed-header := varint-encode(<bit-pack-count> << 1 | 1) +// lastBitPackedNum := the number of useful value in last bit-pack may be less than 8, so lastBitPackedNum indicates how many values are useful +// bit-packed-values := bit packed +// rle-run := <rle-header> <repeated-value> +// rle-header := varint-encode( (number of times repeated) << 1) +// repeated-value := value that is repeated, using a fixed-width of round-up-to-next-byte(bit-width) + +type LongRleDecoder struct { + endianType constant.EndianType + dataType constant.TSDataType + + reader *utils.BytesReader + packer *bitpacking.LongPacker + + packageReader *utils.BytesReader + // how many bytes for all encoded data + length int + // bit width for bit-packing and rle to decode + bitWidth int + // number of data left for reading in current buffer + currentCount int + // mode to indicate current encoding type + mode int + // number of bit-packing group in which is saved in header + bitPackingNum int + + currentValue int64 + decodedValues []int64 + + isReadingBegan bool +} + +func (d *LongRleDecoder) Init(data []byte) { + d.reader = utils.NewBytesReader(data) + d.currentCount = 0 + d.currentValue = 0 + d.isReadingBegan = false +} + +func (d *LongRleDecoder) HasNext() bool { + if d.currentCount > 0 || d.reader.Len() > 0 || d.packageReader.Len() > 0 { + return true + } + return false +} + +func (d *LongRleDecoder) Next() interface{} { + if !d.isReadingBegan { + // read length and bit width of current package before we decode number + d.length = int(d.reader.ReadUnsignedVarInt()) + + d.packageReader = utils.NewBytesReader(d.reader.ReadSlice(d.length)) + d.bitWidth = int(d.packageReader.Read()) + + d.packer = &bitpacking.LongPacker{BitWidth: d.bitWidth} + + d.isReadingBegan = true + } + + if d.currentCount == 0 { + d.readPackage() + } + + d.currentCount-- + + var result int64 = 0 + switch d.mode { + case RLE: + result = d.currentValue + break + case BIT_PACKED: + result = d.decodedValues[d.bitPackingNum-d.currentCount-1] + break + default: + panic("tsfile-encoding LongRleDecoder: not a valid mode") + } + + // if d.currentCount > 0 || d.packageReader.Len() <= 0 { Review comment: old code? ########## File path: tsfile-go/src/tsfile/file/metadata/RowGroupMetaData.go ########## @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package metadata + +import ( + //_ "log" + "bytes" + "tsfile/common/constant" + "tsfile/common/log" + "tsfile/common/utils" +) + +type RowGroupMetaData struct { + device string + totalByteSize int64 + fileOffsetOfCorrespondingData int64 + serializedSize int + ChunkMetaDataSli []*ChunkMetaData + sizeOfChunkSli int +} + +func (f *RowGroupMetaData) Deserialize(reader *utils.BytesReader) { + f.device = reader.ReadString() + f.totalByteSize = reader.ReadLong() + f.fileOffsetOfCorrespondingData = reader.ReadLong() + size := int(reader.ReadInt()) + + f.serializedSize = constant.INT_LEN + len(f.device) + constant.LONG_LEN + constant.INT_LEN + + f.ChunkMetaDataSli = make([]*ChunkMetaData, 0) + for i := 0; i < size; i++ { + chunkMetaData := new(ChunkMetaData) + chunkMetaData.Deserialize(reader) + f.ChunkMetaDataSli = append(f.ChunkMetaDataSli, chunkMetaData) + f.serializedSize += chunkMetaData.GetSerializedSize() + } +} + +func (f *RowGroupMetaData) GetSerializedSize() int { + return f.serializedSize +} + +func (r *RowGroupMetaData) AddChunkMetaData(md *ChunkMetaData) { + if len(r.ChunkMetaDataSli) == 0 { + r.ChunkMetaDataSli = make([]*ChunkMetaData, 0) + } + r.ChunkMetaDataSli = append(r.ChunkMetaDataSli, md) + r.serializedSize += md.GetSerializedSize() + r.sizeOfChunkSli += 1 +} + +func (r *RowGroupMetaData) SetTotalByteSize(ms int64) { + r.totalByteSize = ms +} + +func (r *RowGroupMetaData) GetDeviceId() string { + return r.device +} + +func (r *RowGroupMetaData) SerializeTo(buf *bytes.Buffer) int { + if r.sizeOfChunkSli != len(r.ChunkMetaDataSli) { + r.RecalculateSerializedSize() + } + var byteLen int + + n1, _ := buf.Write(utils.Int32ToByte(int32(len(r.device)), 0)) + byteLen += n1 + n2, _ := buf.Write([]byte(r.device)) + byteLen += n2 + + n3, _ := buf.Write(utils.Int64ToByte(r.totalByteSize, 0)) + byteLen += n3 + n4, _ := buf.Write(utils.Int64ToByte(r.fileOffsetOfCorrespondingData, 0)) + byteLen += n4 + + n5, _ := buf.Write(utils.Int32ToByte(int32(len(r.ChunkMetaDataSli)), 0)) + byteLen += n5 + for _, v := range r.ChunkMetaDataSli { + byteLen += v.SerializeTo(buf) + } + + return byteLen +} + +func (r *RowGroupMetaData) GetChunkMetaDataSli() []*ChunkMetaData { + //if r.ChunkMetaDataSli == nil { Review comment: old code? ########## File path: tsfile-go/src/tsfile/encoding/encoder/IntDeltaEncoder.go ########## @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package encoder + +import ( + "bytes" + "encoding/binary" + _ "fmt" + "math" + _ "time" + "tsfile/common/constant" + "tsfile/common/utils" +) + +const BLOCK_DEFAULT_SIZE = 128 + +type IntDeltaEncoder struct { + endianType constant.EndianType + dataType constant.TSDataType + + blockSize int32 + width int32 + index int32 + + baseValue int32 + firstValue int32 + previousValue int32 + encodedValues []int32 +} + +func (d *IntDeltaEncoder) Encode(v interface{}, buffer *bytes.Buffer) { + value := v.(int32) + + if d.index == -1 { + d.index++ + d.firstValue = value + d.previousValue = d.firstValue + return + } + + // calculate delta + delta := value - d.previousValue + if delta < d.baseValue { + d.baseValue = delta + } + d.encodedValues[d.index] = delta + d.index++ + + d.previousValue = value + if d.index == d.blockSize { + d.Flush(buffer) + } +} + +func (d *IntDeltaEncoder) Flush(buffer *bytes.Buffer) { + if d.index != -1 { + index := d.index + // since we store the min delta, the deltas will be converted to be the difference to min delta and all positive + for i := int32(0); i < index; i++ { + d.encodedValues[i] -= d.baseValue + } + + width := int32(0) + for i := int32(0); i < index; i++ { + valueWidth := int32(32) - utils.NumberOfLeadingZeros(d.encodedValues[i]) + if valueWidth > width { + width = valueWidth + } + } + + d.width = width + + //write header + binary.Write(buffer, binary.BigEndian, index) + binary.Write(buffer, binary.BigEndian, width) + binary.Write(buffer, binary.BigEndian, d.baseValue) + binary.Write(buffer, binary.BigEndian, d.firstValue) + //buffer.Write(utils.Int32ToByte(d.index, int16(constant.BIG_ENDIAN))) Review comment: old code? ########## File path: tsfile-go/src/tsfile/main.go ########## @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "os" + "time" + "tsfile/common/constant" + "tsfile/common/log" + "tsfile/timeseries/write/sensorDescriptor" + "tsfile/timeseries/write/tsFileWriter" +) + +const ( + fileName = "test.ts" +) + +func main() { + defer func() { + if err := recover(); err != nil { + log.Info("Error: ", err) + } + }() + + if _, err := os.Stat(fileName); !os.IsNotExist(err) { + os.Remove(fileName) + } + + // init tsFileWriter + tfWriter, tfwErr := tsFileWriter.NewTsFileWriter(fileName) + if tfwErr != nil { + log.Info("init tsFileWriter error = %s", tfwErr) + } + + // init sensorDescriptor + sd, sdErr := sensorDescriptor.NewWithCompress("sensor_1", constant.TEXT, constant.PLAIN, constant.SNAPPY) + if sdErr != nil { + log.Info("init sensorDescriptor error = %s", sdErr) + } + //sd2, sdErr2 := sensorDescriptor.New("sensor_2", constant.TEXT, constant.PLAIN) + //if sdErr2 != nil { + // log.Info("init sensorDescriptor error = %s", sdErr2) + //} + + // add sensorDescriptor to tfFileWriter + tfWriter.AddSensor(sd) + //tfWriter.AddSensor(sd2) + + // create a tsRecord + ts := time.Now() + //timestamp := strconv.FormatInt(ts.UTC().UnixNano(), 10) + //fmt.Println(timestamp) + tr, trErr := tsFileWriter.NewTsRecord(ts, "device_1") + if trErr != nil { + log.Info("init tsRecord error.") + } + + // create two data points + fdp, fDpErr := tsFileWriter.NewString("sensor_1", constant.TEXT, "hello moto!") + //fdp, fDpErr := tsFileWriter.NewDouble("sensor_1", constant.DOUBLE, 1.2) + if fDpErr != nil { + log.Info("init float data point error.") + } + //idp, iDpErr := tsFileWriter.NewString("sensor_2", constant.TEXT, "hello moto!") + //if iDpErr != nil { + // log.Info("init int data point error.") + //} + + // add data points to ts record + tr.AddTuple(fdp) + //tr.AddTuple(idp) + + // write tsRecord to file + tfWriter.Write(tr) + + //log.Info("init tsRecord device_1_2") Review comment: old code? ########## File path: tsfile-go/src/tsfile/timeseries/write/tsFileWriter/ValueWriter.go ########## @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package tsFileWriter + +/** + * @Package Name: valueWriter + * @Author: steven yao + * @Email: [email protected] + * @Create Date: 18-8-31 下午4:51 + * @Description: + */ + +import ( + "bytes" + "tsfile/common/conf" + "tsfile/common/utils" + "tsfile/encoding/encoder" + "tsfile/timeseries/write/sensorDescriptor" +) + +type ValueWriter struct { + // time + timeEncoder encoder.Encoder //interface{} + valueEncoder encoder.Encoder //interface{} + timeBuf *bytes.Buffer + valueBuf *bytes.Buffer + desc *sensorDescriptor.SensorDescriptor + //buf := bytes.NewBuffer([]byte{}) +} + +func (v *ValueWriter) GetCurrentMemSize() int { + return v.timeBuf.Len() + v.valueBuf.Len() + + int(v.timeEncoder.GetMaxByteSize()) + int(v.valueEncoder.GetMaxByteSize()) +} + +func (v *ValueWriter) PrepareEndWriteOnePage() { + v.timeEncoder.Flush(v.timeBuf) + v.valueEncoder.Flush(v.valueBuf) +} + +func (v *ValueWriter) GetByteBuffer() *bytes.Buffer { + v.PrepareEndWriteOnePage() + encodeBuffer := bytes.NewBuffer([]byte{}) + var timeLen int32 = int32(v.timeBuf.Len()) + + // write timeBuf size + utils.WriteUnsignedVarInt(timeLen, encodeBuffer) + + //声明一个空的slice,容量为timebuf的长度 + timeSlice := make([]byte, timeLen) + //把buf的内容读入到timeSlice内,因为timeSlice容量为timeSize,所以只读了timeSize个过来 + v.timeBuf.Read(timeSlice) + encodeBuffer.Write(timeSlice) + + //声明一个空的value slice,容量为valuebuf的长度 + valueSlice := make([]byte, v.valueBuf.Len()) + //把buf的内容读入到timeSlice内,因为timeSlice容量为timeSize,所以只读了timeSize个过来 + v.valueBuf.Read(valueSlice) + encodeBuffer.Write(valueSlice) + + return encodeBuffer +} + +// write with encoder +func (v *ValueWriter) Write(t int64, tdt int16, data *DataPoint, valueCount int) { + v.timeEncoder.Encode(t, v.timeBuf) + switch tdt { + case 0, 1, 2, 3, 4, 5: + v.valueEncoder.Encode(data.value, v.valueBuf) + case 6: + //fixed_len_byte_array + case 7: + //enums + case 8: + //bigdecimal + default: + // int32 + } +} + +// write without encoder +func (v *ValueWriter) WriteWithoutEnc(t int64, tdt int16, value interface{}, valueCount int) { + var timeByteData []byte + var valueByteData []byte + switch tdt { + case 0: + // bool + if data, ok := value.(bool); ok { + // encode + valueByteData = utils.BoolToByte(data, 1) + } + case 1: + //int32 + if data, ok := value.(int32); ok { + valueByteData = utils.Int32ToByte(data, 1) + } + case 2: + //int64 + if data, ok := value.(int64); ok { + valueByteData = utils.Int64ToByte(data, 1) + } + + case 3: + //float + if data, ok := value.(float32); ok { + valueByteData = utils.Float32ToByte(data, 1) + } + case 4: + //double , float64 in golang as double in c + if data, ok := value.(float64); ok { + valueByteData = utils.Float64ToByte(data, 1) + } + case 5: + //text + if data, ok := value.(string); ok { + valueByteData = []byte(data) + } + case 6: + //fixed_len_byte_array + case 7: + //enums + case 8: + //bigdecimal + default: + // int32 + } + // write time to byteBuffer + timeByteData = utils.Int64ToByte(t, 1) + + // write to byteBuffer + if valueCount == 0 { + aa := []byte{24} + v.timeBuf.Write(aa) + //s.timeBuf.Write(utils.BoolToByte(true)) Review comment: old code? ########## File path: tsfile-go/src/tsfile/timeseries/write/tsFileWriter/IntDataPoint.go ########## @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package tsFileWriter + +import ( + "tsfile/common/constant" +) + +/** + * @Package Name: dataPoint + * @Author: steven yao + * @Email: [email protected] + * @Create Date: 18-8-27 下午3:19 + * @Description: + */ + +type IntDataPoint struct { + sensorId string + tsDataType int16 + value int32 +} + +//func (d *DataPoint) Write(v []byte) ([]byte,error) { Review comment: old code? ########## File path: tsfile-go/src/tsfile/timeseries/query/dataset/impl/TimestampQueryDataSet.go ########## @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package impl + +import ( + "errors" + "tsfile/common/constant" + "tsfile/common/log" + "tsfile/timeseries/filter" + "tsfile/timeseries/query/timegen" + "tsfile/timeseries/query/timegen/impl" + "tsfile/timeseries/read/datatype" + "tsfile/timeseries/read/reader" + "tsfile/timeseries/read/reader/impl/basic" + "tsfile/timeseries/read/reader/impl/seek" +) + +type TimestampQueryDataSet struct { + tGen timegen.ITimestampGenerator + rGen *basic.FilteredRowReader + r reader.ISeekableRowReader + + currTime int64 + current *datatype.RowRecord + exhausted bool +} + +func NewTimestampQueryDataSet(selectPaths []string, conditionPaths []string, + selectReaderMap map[string]reader.ISeekableTimeValuePairReader, conditionReaderMap map[string]reader.TimeValuePairReader, filter filter.Filter) *TimestampQueryDataSet { + tGen := impl.NewRowRecordTimestampGenerator(conditionPaths, conditionReaderMap, filter) + rGen := basic.NewFilteredRowReader(conditionPaths, conditionReaderMap, filter) + r := seek.NewSeekableRowReader(selectPaths, selectReaderMap) + return &TimestampQueryDataSet{tGen: tGen, rGen: rGen, r: r, currTime: constant.INVALID_TIMESTAMP, exhausted: false} +} + +func (set *TimestampQueryDataSet) fetch() { + //if set.tGen.HasNext() { Review comment: old code? ########## File path: tsfile-go/src/tsfile/timeseries/write/tsFileWriter/RowGroupWriter.go ########## @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package tsFileWriter + +/** + * @Package Name: rowGroupWriter + * @Author: steven yao + * @Email: [email protected] + * @Create Date: 18-8-28 下午6:19 + * @Description: + */ + +import ( + "tsfile/common/log" + _ "tsfile/common/utils" + "tsfile/file/header" + "tsfile/timeseries/write/sensorDescriptor" +) + +type RowGroupWriter struct { + deviceId string + dataSeriesWriters map[string]*SeriesWriter +} + +func (r *RowGroupWriter) AddSeriesWriter(sd *sensorDescriptor.SensorDescriptor, pageSize int) { + //start_edit wangcan 2018-10-15 + //if contain, _ := utils.MapContains(r.dataSeriesWriters, sd.GetSensorId()); !contain { + _, contain := r.dataSeriesWriters[sd.GetSensorId()] + if !contain { Review comment: old code? ########## File path: tsfile-go/src/tsfile/timeseries/write/tsFileWriter/TsRecord.go ########## @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package tsFileWriter + +/** + * @Package Name: tsRecord + * @Author: steven yao + * @Email: [email protected] + * @Create Date: 18-8-27 下午2:36 + * @Description: + */ + +import ( + "sync" + "time" +) + +type TsRecord struct { + time int64 + deviceId string + //dataPointMap map[string]*DataPoint + DataPointSli []*DataPoint + m sync.Mutex +} + +func (t *TsRecord) SetTime(time time.Time) { + t.time = time.Unix() + return +} + +func (t *TsRecord) SetTimeDeviceID(time time.Time, dId string) { + //PushBack(t, tuple) + // t.dataPointMap[t.deviceId] = tuple + //t.dataPointMap[t.deviceId] = tuple + //t.DataPointSli = dataSlice + t.time = time.Unix() + t.deviceId = dId + return +} + +func (t *TsRecord) SetTimestampDeviceID(ts int64, dId string) { + //PushBack(t, tuple) + // t.dataPointMap[t.deviceId] = tuple + //t.dataPointMap[t.deviceId] = tuple + //t.DataPointSli = dataSlice + t.time = ts + t.deviceId = dId + return +} + +func (t *TsRecord) SetDataPointSli(dataSlice []*DataPoint) { + //PushBack(t, tuple) + // t.dataPointMap[t.deviceId] = tuple + //t.dataPointMap[t.deviceId] = tuple + t.DataPointSli = dataSlice + return +} + +func (t *TsRecord) AddTuple(tuple *DataPoint) { + //PushBack(t, tuple) + // t.dataPointMap[t.deviceId] = tuple + //t.dataPointMap[t.deviceId] = tuple + t.DataPointSli = append(t.DataPointSli, tuple) + return +} + +func (t *TsRecord) GetTime() int64 { + return t.time +} + +func (t *TsRecord) GetDeviceId() string { + return t.deviceId +} + +func (t *TsRecord) GetDataPointSli() []*DataPoint { + return t.DataPointSli +} + +//func PushBack(t *TsRecord, tuple dataPoint.DataPoint) { Review comment: old code? ########## File path: tsfile-go/src/tsfile/timeseries/write/tsFileWriter/PageWriter.go ########## @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package tsFileWriter + +/** + * @Package Name: pageWriter + * @Author: steven yao + * @Email: [email protected] + * @Create Date: 18-8-31 下午3:51 + * @Description: + */ + +import ( + "bytes" + "tsfile/common/constant" + "tsfile/common/log" + "tsfile/compress" + "tsfile/file/header" + "tsfile/file/metadata/statistics" + "tsfile/timeseries/write/sensorDescriptor" +) + +type PageWriter struct { + compressor *compress.Encompress + desc *sensorDescriptor.SensorDescriptor + pageBuf *bytes.Buffer + totalValueCount int64 + maxTimestamp int64 + minTimestamp int64 +} + +func (p *PageWriter) WritePageHeaderAndDataIntoBuff(dataBuffer *bytes.Buffer, valueCount int, sts statistics.Statistics, maxTimestamp int64, minTimestamp int64) int { + if p.desc.GetCompresstionType() == int16(constant.UNCOMPRESSED) { + //this uncompressedSize should be calculate from timeBuf and valueBuf + uncompressedSize := dataBuffer.Len() + var compressedSize int = uncompressedSize + pageHeader, pageHeaderErr := header.NewPageHeader( + int32(uncompressedSize), int32(compressedSize), + int32(valueCount), sts, maxTimestamp, + minTimestamp, p.desc.GetTsDataType()) + if pageHeaderErr != nil { + log.Error("init pageHeader error: ", pageHeaderErr) + } + pageHeader.PageHeaderToMemory(p.pageBuf, p.desc.GetTsDataType()) + p.pageBuf.Write(dataBuffer.Bytes()) + p.totalValueCount += int64(valueCount) + } else { + //this uncompressedSize should be calculate from timeBuf and valueBuf + uncompressedSize := dataBuffer.Len() + + // write pageData to pageBuf + //声明一个空的slice,容量为dataBuffer的长度 + dataSlice := make([]byte, dataBuffer.Len()) + //把buf的内容读入到timeSlice内,因为timeSlice容量为timeSize,所以只读了timeSize个过来 + dataBuffer.Read(dataSlice) + + var compressedSize int + var enc []byte + aSlice := make([]byte, 0) + enc = p.compressor.GetEncompressor(p.desc.GetCompresstionType()).Encompress(aSlice, dataSlice) + compressedSize = len(enc) + + pageHeader, pageHeaderErr := header.NewPageHeader(int32(uncompressedSize), int32(compressedSize), int32(valueCount), sts, maxTimestamp, minTimestamp, p.desc.GetTsDataType()) + if pageHeaderErr != nil { + log.Error("init pageHeader error: ", pageHeaderErr) + } + // write pageheader to pageBuf Review comment: old code? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
