jt2594838 commented on code in PR #731: URL: https://github.com/apache/tsfile/pull/731#discussion_r2867278858
########## cpp/src/file/restorable_tsfile_io_writer.cc: ########## @@ -0,0 +1,868 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "file/restorable_tsfile_io_writer.h" + +#include <fcntl.h> + +#include <algorithm> +#include <cstring> +#include <memory> +#include <vector> + +#include "common/allocator/byte_stream.h" +#include "common/device_id.h" +#include "common/statistic.h" +#include "common/tsfile_common.h" +#include "compress/compressor_factory.h" +#include "encoding/decoder_factory.h" +#include "utils/errno_define.h" + +#ifdef _WIN32 +#include <io.h> +#include <sys/stat.h> +#include <windows.h> +ssize_t pread(int fd, void* buf, size_t count, uint64_t offset); +#else +#include <sys/stat.h> +#include <unistd.h> +#endif + +using namespace common; + +namespace storage { + +namespace { + +const int HEADER_LEN = MAGIC_STRING_TSFILE_LEN + 1; // magic + version +const int BUF_SIZE = 4096; + +// ----------------------------------------------------------------------------- +// Self-check helpers: read file, parse chunk header, recover chunk statistics +// ----------------------------------------------------------------------------- + +/** + * Lightweight read-only file handle for self-check only. + * Use init_from_fd() when WriteFile is already open to avoid opening the file + * twice (fixes Windows file sharing and ensures we read the same content). + */ +struct SelfCheckReader { + int fd_; + int32_t file_size_; + bool own_fd_; // if false, do not close fd_ + + SelfCheckReader() : fd_(-1), file_size_(-1), own_fd_(true) {} + + int init_from_fd(int fd) { + fd_ = fd; + own_fd_ = false; + if (fd_ < 0) { + return E_FILE_OPEN_ERR; + } +#ifdef _WIN32 + struct _stat st; + if (_fstat(fd_, &st) < 0) { + return E_FILE_STAT_ERR; + } + file_size_ = static_cast<int32_t>(st.st_size); +#else + struct stat st; + if (fstat(fd_, &st) < 0) { + return E_FILE_STAT_ERR; + } + file_size_ = static_cast<int32_t>(st.st_size); +#endif + return E_OK; + } + + int open(const std::string& path) { +#ifdef _WIN32 + fd_ = ::_open(path.c_str(), _O_RDONLY | _O_BINARY); +#else + fd_ = ::open(path.c_str(), O_RDONLY); +#endif + if (fd_ < 0) { + return E_FILE_OPEN_ERR; + } + own_fd_ = true; +#ifdef _WIN32 + struct _stat st; + if (_fstat(fd_, &st) < 0) { + close(); + return E_FILE_STAT_ERR; + } + file_size_ = static_cast<int32_t>(st.st_size); +#else + struct stat st; + if (fstat(fd_, &st) < 0) { + close(); + return E_FILE_STAT_ERR; + } + file_size_ = static_cast<int32_t>(st.st_size); +#endif + return E_OK; + } + + void close() { + if (own_fd_ && fd_ >= 0) { +#ifdef _WIN32 + ::_close(fd_); +#else + ::close(fd_); +#endif + } + fd_ = -1; + file_size_ = -1; + } + + int32_t file_size() const { return file_size_; } + + int read(int32_t offset, char* buf, int32_t buf_size, int32_t& read_len) { + read_len = 0; + if (fd_ < 0) { + return E_FILE_READ_ERR; + } + ssize_t n = ::pread(fd_, buf, buf_size, offset); + if (n < 0) { + return E_FILE_READ_ERR; + } + read_len = static_cast<int32_t>(n); + return E_OK; + } +}; + +#ifdef _WIN32 +ssize_t pread(int fd, void* buf, size_t count, uint64_t offset) { + DWORD read_bytes = 0; + OVERLAPPED ov = {}; + ov.OffsetHigh = (DWORD)((offset >> 32) & 0xFFFFFFFF); + ov.Offset = (DWORD)(offset & 0xFFFFFFFF); + HANDLE h = (HANDLE)_get_osfhandle(fd); + if (!ReadFile(h, buf, (DWORD)count, &read_bytes, &ov)) { + if (GetLastError() != ERROR_HANDLE_EOF) { + return -1; + } + } + return (ssize_t)read_bytes; +} +#endif + +/** + * Parse chunk header at chunk_start and compute total chunk size (header + + * data). Does not read full chunk data; used to advance scan position. + * @param header_out If non-null, filled with the deserialized chunk header. + * @param bytes_consumed Set to header_len + data_size on success. + */ +static int parse_chunk_header_and_skip(SelfCheckReader& reader, + int64_t chunk_start, + int64_t& bytes_consumed, + ChunkHeader* header_out = nullptr) { + int32_t file_size = reader.file_size(); + int32_t max_read = static_cast<int32_t>( + std::min(static_cast<int64_t>(BUF_SIZE), file_size - chunk_start)); + if (max_read < ChunkHeader::MIN_SERIALIZED_SIZE) { + return E_TSFILE_CORRUPTED; + } + + std::vector<char> buf(max_read); + int32_t read_len = 0; + int ret = reader.read(static_cast<int32_t>(chunk_start), buf.data(), + max_read, read_len); + if (ret != E_OK || read_len < ChunkHeader::MIN_SERIALIZED_SIZE) { + return E_TSFILE_CORRUPTED; + } + + ByteStream bs; + bs.wrap_from(buf.data(), read_len); + + ChunkHeader header; + ret = header.deserialize_from(bs); + if (ret != E_OK) { + return E_TSFILE_CORRUPTED; + } + + int header_len = bs.read_pos(); + int64_t total = header_len + header.data_size_; + if (chunk_start + total > file_size) { + return E_TSFILE_CORRUPTED; + } + + if (header_out != nullptr) { + *header_out = header; + } + bytes_consumed = total; + return E_OK; +} + +/** + * Recover chunk-level statistic from chunk data so that tail metadata can be + * generated correctly after recovery (aligned with Java TsFileSequenceReader + * selfCheck). Multi-page: merge each page header's statistic. Single-page: + * decode page data and update stat. For aligned value chunks, time_batch + * (from the time chunk in the same group) must be provided. + */ +static int recover_chunk_statistic( + const ChunkHeader& chdr, const char* chunk_data, int32_t data_size, + Statistic* out_stat, common::PageArena* pa, + const std::vector<int64_t>* time_batch = nullptr, + std::vector<int64_t>* out_time_batch = nullptr) { + if (chunk_data == nullptr || data_size <= 0 || out_stat == nullptr) { + return E_OK; + } + common::ByteStream bs; + bs.wrap_from(const_cast<char*>(chunk_data), + static_cast<uint32_t>(data_size)); + // Multi-page chunk: high bits of chunk_type_ are 0x00, low 6 bits = + // CHUNK_HEADER_MARKER + const bool multi_page = + (static_cast<unsigned char>(chdr.chunk_type_) & 0x3F) == + static_cast<unsigned char>(CHUNK_HEADER_MARKER); + + if (multi_page) { + while (bs.remaining_size() > 0) { + PageHeader ph; + int ret = ph.deserialize_from(bs, true, chdr.data_type_); + if (ret != common::E_OK) { + return ret; + } + uint32_t comp = ph.compressed_size_; + if (ph.statistic_ != nullptr) { + if (out_stat->merge_with(ph.statistic_) != common::E_OK) { + ph.reset(); + return common::E_TSFILE_CORRUPTED; + } + } + ph.reset(); + bs.wrapped_buf_advance_read_pos(comp); + } + return E_OK; + } + + // Single-page chunk: statistic is not in page header; decompress and decode + // to fill out_stat. is_time_column: bit 0x80 in chunk_type_ indicates time + // column (aligned model). + const bool is_time_column = + (static_cast<unsigned char>(chdr.chunk_type_) & 0x80) != 0; + PageHeader ph; + int ret = ph.deserialize_from(bs, false, chdr.data_type_); + if (ret != common::E_OK || ph.compressed_size_ == 0 || + bs.remaining_size() < ph.compressed_size_) { + return E_OK; + } + const char* compressed_ptr = + chunk_data + (data_size - static_cast<int32_t>(bs.remaining_size())); + char* uncompressed_buf = nullptr; + uint32_t uncompressed_size = 0; + Compressor* compressor = + CompressorFactory::alloc_compressor(chdr.compression_type_); + if (compressor == nullptr) { + return common::E_OOM; + } + ret = compressor->reset(false); + if (ret != common::E_OK) { + CompressorFactory::free(compressor); + return ret; + } + ret = compressor->uncompress(const_cast<char*>(compressed_ptr), + ph.compressed_size_, uncompressed_buf, + uncompressed_size); + if (ret != common::E_OK || uncompressed_buf == nullptr || + uncompressed_size != ph.uncompressed_size_) { + if (uncompressed_buf != nullptr) { + compressor->after_uncompress(uncompressed_buf); + } + CompressorFactory::free(compressor); + return (ret == common::E_OK) ? common::E_TSFILE_CORRUPTED : ret; + } + if (is_time_column) { + /* Time chunk: uncompressed = raw time stream only (no var_uint). */ + Decoder* time_decoder = DecoderFactory::alloc_time_decoder(); + if (time_decoder == nullptr) { + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return common::E_OOM; + } + common::ByteStream time_in; + time_in.wrap_from(uncompressed_buf, uncompressed_size); + time_decoder->reset(); + int64_t t; + if (out_time_batch != nullptr) { + out_time_batch->clear(); + } + while (time_decoder->has_remaining(time_in)) { + if (time_decoder->read_int64(t, time_in) != common::E_OK) { + break; + } + out_stat->update(t); + if (out_time_batch != nullptr) { + out_time_batch->push_back(t); + } + } + DecoderFactory::free(time_decoder); + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return E_OK; + } + + /* Value chunk: parse layout and decode. */ + const char* value_buf = nullptr; + uint32_t value_buf_size = 0; + std::vector<int64_t> time_decode_buf; + const std::vector<int64_t>* times = nullptr; + + if (time_batch != nullptr && !time_batch->empty()) { + // Aligned value page: uncompressed layout = uint32(num_values) + bitmap + // + value_buf + if (uncompressed_size < 4) { + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return E_OK; + } + uint32_t num_values = + (static_cast<uint32_t>( + static_cast<unsigned char>(uncompressed_buf[0])) + << 24) | + (static_cast<uint32_t>( + static_cast<unsigned char>(uncompressed_buf[1])) + << 16) | + (static_cast<uint32_t>( + static_cast<unsigned char>(uncompressed_buf[2])) + << 8) | + (static_cast<uint32_t>( + static_cast<unsigned char>(uncompressed_buf[3]))); + uint32_t bitmap_size = (num_values + 7) / 8; + if (uncompressed_size < 4 + bitmap_size) { + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return E_OK; + } + value_buf = uncompressed_buf + 4 + bitmap_size; + value_buf_size = uncompressed_size - 4 - bitmap_size; + times = time_batch; + } else { + // Non-aligned value page: var_uint(time_buf_size) + time_buf + + // value_buf + int var_size = 0; + uint32_t time_buf_size = 0; + ret = common::SerializationUtil::read_var_uint( + time_buf_size, uncompressed_buf, + static_cast<int>(uncompressed_size), &var_size); + if (ret != common::E_OK || + static_cast<uint32_t>(var_size) + time_buf_size > + uncompressed_size) { + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return (ret == common::E_OK) ? common::E_TSFILE_CORRUPTED : ret; + } + const char* time_buf = uncompressed_buf + var_size; + value_buf = time_buf + time_buf_size; + value_buf_size = + uncompressed_size - static_cast<uint32_t>(var_size) - time_buf_size; + Decoder* time_decoder = DecoderFactory::alloc_time_decoder(); + if (time_decoder == nullptr) { + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return common::E_OOM; + } + common::ByteStream time_in; + time_in.wrap_from(const_cast<char*>(time_buf), time_buf_size); + time_decoder->reset(); + time_decode_buf.clear(); + int64_t t; + while (time_decoder->has_remaining(time_in)) { + if (time_decoder->read_int64(t, time_in) != common::E_OK) { + break; + } + time_decode_buf.push_back(t); + } + DecoderFactory::free(time_decoder); + times = &time_decode_buf; + } + + Decoder* value_decoder = DecoderFactory::alloc_value_decoder( + chdr.encoding_type_, chdr.data_type_); + if (value_decoder == nullptr) { + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return common::E_OOM; + } + common::ByteStream value_in; + value_in.wrap_from(const_cast<char*>(value_buf), value_buf_size); + value_decoder->reset(); + size_t idx = 0; + const size_t num_times = times->size(); + while (idx < num_times && value_decoder->has_remaining(value_in)) { + int64_t t = (*times)[idx]; + switch (chdr.data_type_) { + case common::BOOLEAN: { + bool v; + if (value_decoder->read_boolean(v, value_in) == common::E_OK) { + out_stat->update(t, v); + } + break; + } + case common::INT32: + case common::DATE: { + int32_t v; + if (value_decoder->read_int32(v, value_in) == common::E_OK) { + out_stat->update(t, v); + } + break; + } + case common::INT64: + case common::TIMESTAMP: { + int64_t v; + if (value_decoder->read_int64(v, value_in) == common::E_OK) { + out_stat->update(t, v); + } + break; + } + case common::FLOAT: { + float v; + if (value_decoder->read_float(v, value_in) == common::E_OK) { + out_stat->update(t, v); + } + break; + } + case common::DOUBLE: { + double v; + if (value_decoder->read_double(v, value_in) == common::E_OK) { + out_stat->update(t, v); + } + break; + } + case common::TEXT: + case common::BLOB: + case common::STRING: { + common::String v; + if (pa != nullptr && value_decoder->read_String( + v, *pa, value_in) == common::E_OK) { + out_stat->update(t, v); + } + break; + } + default: + break; + } + idx++; + } + DecoderFactory::free(value_decoder); + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return E_OK; +} + +} // namespace + +RestorableTsFileIOWriter::RestorableTsFileIOWriter() + : TsFileIOWriter(), + write_file_(nullptr), + write_file_owned_(false), + truncated_size_(-1), + crashed_(false), + can_write_(false) { + self_check_arena_.init(512, MOD_TSFILE_READER); +} + +RestorableTsFileIOWriter::~RestorableTsFileIOWriter() { close(); } + +void RestorableTsFileIOWriter::close() { + if (write_file_owned_ && write_file_ != nullptr) { + write_file_->close(); + delete write_file_; + write_file_ = nullptr; + write_file_owned_ = false; + } + for (ChunkGroupMeta* cgm : self_check_recovered_cgm_) { + cgm->device_id_.reset(); + } + self_check_recovered_cgm_.clear(); + self_check_arena_.destroy(); +} + +int RestorableTsFileIOWriter::open(const std::string& file_path, + bool truncate_corrupted) { + if (write_file_ != nullptr) { + return E_ALREADY_EXIST; + } + + file_path_ = file_path; + write_file_ = new WriteFile(); + write_file_owned_ = true; + + // O_RDWR|O_CREAT without O_TRUNC: preserve existing file content +#ifdef _WIN32 + const int flags = O_RDWR | O_CREAT | O_BINARY; +#else + const int flags = O_RDWR | O_CREAT; +#endif + const mode_t mode = 0644; + + int ret = write_file_->create(file_path_, flags, mode); + if (ret != E_OK) { + close(); + return ret; + } + + ret = self_check(truncate_corrupted); + if (ret != E_OK) { + close(); + return ret; + } + + return E_OK; +} + +int RestorableTsFileIOWriter::self_check(bool truncate_corrupted) { + SelfCheckReader reader; + // Use a separate read-only handle for self-check: on Windows, sharing the + // O_RDWR fd can cause stale/cached reads when detecting a complete file. + int ret = reader.open(file_path_); + if (ret != E_OK) { + return ret; + } + + int32_t file_size = reader.file_size(); + + // --- Empty file: treat as crashed, allow writing from scratch --- + if (file_size == 0) { + reader.close(); + truncated_size_ = 0; + crashed_ = true; + can_write_ = true; + if (write_file_->seek_to_end() != E_OK) { + return E_FILE_READ_ERR; + } + ret = init(write_file_); + if (ret != E_OK) { + return ret; + } + ret = start_file(); + if (ret != E_OK) { + return ret; + } + return E_OK; + } + + // --- File too short or invalid header => not a valid TsFile --- + if (file_size < HEADER_LEN) { + reader.close(); + truncated_size_ = TSFILE_CHECK_INCOMPATIBLE; + return E_TSFILE_CORRUPTED; + } + + char header_buf[HEADER_LEN]; + int32_t read_len = 0; + ret = reader.read(0, header_buf, HEADER_LEN, read_len); + if (ret != E_OK || read_len != HEADER_LEN) { + reader.close(); + truncated_size_ = TSFILE_CHECK_INCOMPATIBLE; + return E_TSFILE_CORRUPTED; + } + + if (memcmp(header_buf, MAGIC_STRING_TSFILE, MAGIC_STRING_TSFILE_LEN) != 0) { + reader.close(); + truncated_size_ = TSFILE_CHECK_INCOMPATIBLE; + return E_TSFILE_CORRUPTED; + } + + if (header_buf[MAGIC_STRING_TSFILE_LEN] != VERSION_NUM_BYTE) { + reader.close(); + truncated_size_ = TSFILE_CHECK_INCOMPATIBLE; + return E_TSFILE_CORRUPTED; + } + + // --- Completeness check (aligned with Java isComplete()) --- + // Require size >= 2*magic + version_byte and tail magic same as head magic. + bool is_complete = false; + if (file_size >= static_cast<int32_t>(MAGIC_STRING_TSFILE_LEN * 2 + 1)) { + char tail_buf[MAGIC_STRING_TSFILE_LEN]; + ret = reader.read(file_size - MAGIC_STRING_TSFILE_LEN, tail_buf, + MAGIC_STRING_TSFILE_LEN, read_len); + if (ret == E_OK && read_len == MAGIC_STRING_TSFILE_LEN && + memcmp(tail_buf, MAGIC_STRING_TSFILE, MAGIC_STRING_TSFILE_LEN) == + 0) { + is_complete = true; + } + } + + // --- File is complete: no recovery, close write handle and return --- + if (is_complete) { + reader.close(); + truncated_size_ = TSFILE_CHECK_COMPLETE; + crashed_ = false; + can_write_ = false; + write_file_->close(); + delete write_file_; + write_file_ = nullptr; + write_file_owned_ = false; + return E_OK; + } + + // --- Recovery path: scan from header to find last valid truncation point + // --- + int64_t truncated = HEADER_LEN; + int64_t pos = HEADER_LEN; + std::vector<char> buf(BUF_SIZE); + + // Recover schema and chunk group meta (aligned with Java selfCheck). + // cur_group_time_batch: timestamps decoded from time chunk, used by aligned + // value chunks. + std::shared_ptr<IDeviceID> cur_device_id; + ChunkGroupMeta* cur_cgm = nullptr; + std::vector<ChunkGroupMeta*> recovered_cgm_list; + std::vector<int64_t> cur_group_time_batch; + + auto flush_chunk_group = [this, &cur_device_id, &cur_cgm, + &recovered_cgm_list]() { + if (cur_cgm != nullptr && cur_device_id != nullptr) { + get_schema()->update_table_schema(cur_cgm); + recovered_cgm_list.push_back(cur_cgm); + self_check_recovered_cgm_.push_back(cur_cgm); + cur_cgm = nullptr; + } + }; + + while (pos < file_size) { + unsigned char marker; + ret = reader.read(static_cast<int32_t>(pos), + reinterpret_cast<char*>(&marker), 1, read_len); + if (ret != E_OK || read_len != 1) { + break; + } + pos += 1; + + if (marker == static_cast<unsigned char>(SEPARATOR_MARKER)) { + truncated = pos - 1; + flush_chunk_group(); + break; + } + + if (marker == static_cast<unsigned char>(CHUNK_GROUP_HEADER_MARKER)) { + truncated = pos - 1; + flush_chunk_group(); + cur_group_time_batch.clear(); + int seg_len = 0; + ret = reader.read(static_cast<int32_t>(pos), buf.data(), BUF_SIZE, + read_len); + if (ret != E_OK || read_len < 1) { + break; + } + ByteStream bs; + bs.wrap_from(buf.data(), read_len); + cur_device_id = std::make_shared<StringArrayDeviceID>("init"); + ret = cur_device_id->deserialize(bs); + if (ret != E_OK) { + break; + } + seg_len = bs.read_pos(); + pos += seg_len; + cur_cgm = new (self_check_arena_.alloc(sizeof(ChunkGroupMeta))) + ChunkGroupMeta(&self_check_arena_); + cur_cgm->init(cur_device_id); + continue; + } + + if (marker == static_cast<unsigned char>(OPERATION_INDEX_RANGE)) { + truncated = pos - 1; + flush_chunk_group(); + cur_device_id.reset(); + if (pos + 2 * 8 > static_cast<int64_t>(file_size)) { + break; + } + char range_buf[16]; + ret = + reader.read(static_cast<int32_t>(pos), range_buf, 16, read_len); + if (ret != E_OK || read_len != 16) { + break; + } + pos += 16; + truncated = pos; + continue; + } + + if (marker == static_cast<unsigned char>(CHUNK_HEADER_MARKER) || + marker == + static_cast<unsigned char>(ONLY_ONE_PAGE_CHUNK_HEADER_MARKER) || + (marker & 0x3F) == + static_cast<unsigned char>(CHUNK_HEADER_MARKER) || + (marker & 0x3F) == + static_cast<unsigned char>(ONLY_ONE_PAGE_CHUNK_HEADER_MARKER)) { + int64_t chunk_start = pos - 1; + int64_t consumed = 0; + ChunkHeader chdr; + ret = parse_chunk_header_and_skip(reader, chunk_start, consumed, + &chdr); + if (ret != E_OK) { + break; + } + pos = chunk_start + consumed; + truncated = pos; + if (cur_cgm != nullptr) { + void* cm_buf = self_check_arena_.alloc(sizeof(ChunkMeta)); + if (IS_NULL(cm_buf)) { + ret = common::E_OOM; + break; + } + auto* cm = new (cm_buf) ChunkMeta(); + common::String mname; + mname.dup_from(chdr.measurement_name_, self_check_arena_); + Statistic* stat = StatisticFactory::alloc_statistic_with_pa( + static_cast<common::TSDataType>(chdr.data_type_), + &self_check_arena_); + if (IS_NULL(stat)) { + ret = common::E_OOM; + break; + } + stat->reset(); + if (chdr.data_size_ > 0) { + const int32_t header_len = + static_cast<int32_t>(consumed) - chdr.data_size_; + if (header_len > 0 && chunk_start + consumed <= + static_cast<int64_t>(file_size)) { + std::vector<char> chunk_data(chdr.data_size_); + int32_t read_len = 0; + ret = reader.read( + static_cast<int32_t>(chunk_start + header_len), + chunk_data.data(), chdr.data_size_, read_len); + if (ret == E_OK && read_len == chdr.data_size_) { + ret = recover_chunk_statistic( + chdr, chunk_data.data(), chdr.data_size_, stat, + &self_check_arena_, &cur_group_time_batch, + &cur_group_time_batch); + } + if (ret != E_OK) { + break; + } + } + } + cm->init(mname, + static_cast<common::TSDataType>(chdr.data_type_), + chunk_start, stat, 0, + static_cast<common::TSEncoding>(chdr.encoding_type_), + static_cast<common::CompressionType>( + chdr.compression_type_), + self_check_arena_); + cur_cgm->push(cm); + if (cur_device_id != nullptr && + (static_cast<unsigned char>(chdr.chunk_type_) & 0x80) != + 0) { Review Comment: Add a comment for this or use a meaningful macro instead ########## cpp/src/file/restorable_tsfile_io_writer.cc: ########## @@ -0,0 +1,868 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "file/restorable_tsfile_io_writer.h" + +#include <fcntl.h> + +#include <algorithm> +#include <cstring> +#include <memory> +#include <vector> + +#include "common/allocator/byte_stream.h" +#include "common/device_id.h" +#include "common/statistic.h" +#include "common/tsfile_common.h" +#include "compress/compressor_factory.h" +#include "encoding/decoder_factory.h" +#include "utils/errno_define.h" + +#ifdef _WIN32 +#include <io.h> +#include <sys/stat.h> +#include <windows.h> +ssize_t pread(int fd, void* buf, size_t count, uint64_t offset); +#else +#include <sys/stat.h> +#include <unistd.h> +#endif + +using namespace common; + +namespace storage { + +namespace { + +const int HEADER_LEN = MAGIC_STRING_TSFILE_LEN + 1; // magic + version +const int BUF_SIZE = 4096; + +// ----------------------------------------------------------------------------- +// Self-check helpers: read file, parse chunk header, recover chunk statistics +// ----------------------------------------------------------------------------- + +/** + * Lightweight read-only file handle for self-check only. + * Use init_from_fd() when WriteFile is already open to avoid opening the file + * twice (fixes Windows file sharing and ensures we read the same content). + */ +struct SelfCheckReader { + int fd_; + int32_t file_size_; + bool own_fd_; // if false, do not close fd_ + + SelfCheckReader() : fd_(-1), file_size_(-1), own_fd_(true) {} + + int init_from_fd(int fd) { + fd_ = fd; + own_fd_ = false; + if (fd_ < 0) { + return E_FILE_OPEN_ERR; + } +#ifdef _WIN32 + struct _stat st; + if (_fstat(fd_, &st) < 0) { + return E_FILE_STAT_ERR; + } + file_size_ = static_cast<int32_t>(st.st_size); +#else + struct stat st; + if (fstat(fd_, &st) < 0) { + return E_FILE_STAT_ERR; + } + file_size_ = static_cast<int32_t>(st.st_size); +#endif + return E_OK; + } + + int open(const std::string& path) { +#ifdef _WIN32 + fd_ = ::_open(path.c_str(), _O_RDONLY | _O_BINARY); +#else + fd_ = ::open(path.c_str(), O_RDONLY); +#endif + if (fd_ < 0) { + return E_FILE_OPEN_ERR; + } + own_fd_ = true; +#ifdef _WIN32 + struct _stat st; + if (_fstat(fd_, &st) < 0) { + close(); + return E_FILE_STAT_ERR; + } + file_size_ = static_cast<int32_t>(st.st_size); +#else + struct stat st; + if (fstat(fd_, &st) < 0) { + close(); + return E_FILE_STAT_ERR; + } + file_size_ = static_cast<int32_t>(st.st_size); +#endif + return E_OK; + } + + void close() { + if (own_fd_ && fd_ >= 0) { +#ifdef _WIN32 + ::_close(fd_); +#else + ::close(fd_); +#endif + } + fd_ = -1; + file_size_ = -1; + } + + int32_t file_size() const { return file_size_; } + + int read(int32_t offset, char* buf, int32_t buf_size, int32_t& read_len) { + read_len = 0; + if (fd_ < 0) { + return E_FILE_READ_ERR; + } + ssize_t n = ::pread(fd_, buf, buf_size, offset); + if (n < 0) { + return E_FILE_READ_ERR; + } + read_len = static_cast<int32_t>(n); + return E_OK; + } +}; + +#ifdef _WIN32 +ssize_t pread(int fd, void* buf, size_t count, uint64_t offset) { + DWORD read_bytes = 0; + OVERLAPPED ov = {}; + ov.OffsetHigh = (DWORD)((offset >> 32) & 0xFFFFFFFF); + ov.Offset = (DWORD)(offset & 0xFFFFFFFF); + HANDLE h = (HANDLE)_get_osfhandle(fd); + if (!ReadFile(h, buf, (DWORD)count, &read_bytes, &ov)) { + if (GetLastError() != ERROR_HANDLE_EOF) { + return -1; + } + } + return (ssize_t)read_bytes; +} +#endif + +/** + * Parse chunk header at chunk_start and compute total chunk size (header + + * data). Does not read full chunk data; used to advance scan position. + * @param header_out If non-null, filled with the deserialized chunk header. + * @param bytes_consumed Set to header_len + data_size on success. + */ +static int parse_chunk_header_and_skip(SelfCheckReader& reader, + int64_t chunk_start, + int64_t& bytes_consumed, + ChunkHeader* header_out = nullptr) { + int32_t file_size = reader.file_size(); + int32_t max_read = static_cast<int32_t>( + std::min(static_cast<int64_t>(BUF_SIZE), file_size - chunk_start)); + if (max_read < ChunkHeader::MIN_SERIALIZED_SIZE) { + return E_TSFILE_CORRUPTED; + } + + std::vector<char> buf(max_read); + int32_t read_len = 0; + int ret = reader.read(static_cast<int32_t>(chunk_start), buf.data(), + max_read, read_len); + if (ret != E_OK || read_len < ChunkHeader::MIN_SERIALIZED_SIZE) { + return E_TSFILE_CORRUPTED; + } + + ByteStream bs; + bs.wrap_from(buf.data(), read_len); + + ChunkHeader header; + ret = header.deserialize_from(bs); + if (ret != E_OK) { + return E_TSFILE_CORRUPTED; + } + + int header_len = bs.read_pos(); + int64_t total = header_len + header.data_size_; + if (chunk_start + total > file_size) { + return E_TSFILE_CORRUPTED; + } + + if (header_out != nullptr) { + *header_out = header; + } + bytes_consumed = total; + return E_OK; +} + +/** + * Recover chunk-level statistic from chunk data so that tail metadata can be + * generated correctly after recovery (aligned with Java TsFileSequenceReader + * selfCheck). Multi-page: merge each page header's statistic. Single-page: + * decode page data and update stat. For aligned value chunks, time_batch + * (from the time chunk in the same group) must be provided. + */ +static int recover_chunk_statistic( + const ChunkHeader& chdr, const char* chunk_data, int32_t data_size, + Statistic* out_stat, common::PageArena* pa, + const std::vector<int64_t>* time_batch = nullptr, + std::vector<int64_t>* out_time_batch = nullptr) { + if (chunk_data == nullptr || data_size <= 0 || out_stat == nullptr) { + return E_OK; + } + common::ByteStream bs; + bs.wrap_from(const_cast<char*>(chunk_data), + static_cast<uint32_t>(data_size)); + // Multi-page chunk: high bits of chunk_type_ are 0x00, low 6 bits = + // CHUNK_HEADER_MARKER + const bool multi_page = + (static_cast<unsigned char>(chdr.chunk_type_) & 0x3F) == + static_cast<unsigned char>(CHUNK_HEADER_MARKER); + + if (multi_page) { + while (bs.remaining_size() > 0) { + PageHeader ph; + int ret = ph.deserialize_from(bs, true, chdr.data_type_); + if (ret != common::E_OK) { + return ret; + } + uint32_t comp = ph.compressed_size_; + if (ph.statistic_ != nullptr) { + if (out_stat->merge_with(ph.statistic_) != common::E_OK) { + ph.reset(); + return common::E_TSFILE_CORRUPTED; + } + } + ph.reset(); + bs.wrapped_buf_advance_read_pos(comp); + } + return E_OK; + } + + // Single-page chunk: statistic is not in page header; decompress and decode + // to fill out_stat. is_time_column: bit 0x80 in chunk_type_ indicates time + // column (aligned model). + const bool is_time_column = + (static_cast<unsigned char>(chdr.chunk_type_) & 0x80) != 0; + PageHeader ph; + int ret = ph.deserialize_from(bs, false, chdr.data_type_); + if (ret != common::E_OK || ph.compressed_size_ == 0 || + bs.remaining_size() < ph.compressed_size_) { + return E_OK; + } + const char* compressed_ptr = + chunk_data + (data_size - static_cast<int32_t>(bs.remaining_size())); + char* uncompressed_buf = nullptr; + uint32_t uncompressed_size = 0; + Compressor* compressor = + CompressorFactory::alloc_compressor(chdr.compression_type_); + if (compressor == nullptr) { + return common::E_OOM; + } + ret = compressor->reset(false); + if (ret != common::E_OK) { + CompressorFactory::free(compressor); + return ret; + } + ret = compressor->uncompress(const_cast<char*>(compressed_ptr), + ph.compressed_size_, uncompressed_buf, + uncompressed_size); + if (ret != common::E_OK || uncompressed_buf == nullptr || + uncompressed_size != ph.uncompressed_size_) { + if (uncompressed_buf != nullptr) { + compressor->after_uncompress(uncompressed_buf); + } + CompressorFactory::free(compressor); + return (ret == common::E_OK) ? common::E_TSFILE_CORRUPTED : ret; + } + if (is_time_column) { + /* Time chunk: uncompressed = raw time stream only (no var_uint). */ + Decoder* time_decoder = DecoderFactory::alloc_time_decoder(); + if (time_decoder == nullptr) { + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return common::E_OOM; + } + common::ByteStream time_in; + time_in.wrap_from(uncompressed_buf, uncompressed_size); + time_decoder->reset(); + int64_t t; + if (out_time_batch != nullptr) { + out_time_batch->clear(); + } + while (time_decoder->has_remaining(time_in)) { + if (time_decoder->read_int64(t, time_in) != common::E_OK) { + break; + } + out_stat->update(t); + if (out_time_batch != nullptr) { + out_time_batch->push_back(t); + } + } + DecoderFactory::free(time_decoder); + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return E_OK; + } + + /* Value chunk: parse layout and decode. */ + const char* value_buf = nullptr; + uint32_t value_buf_size = 0; + std::vector<int64_t> time_decode_buf; + const std::vector<int64_t>* times = nullptr; + + if (time_batch != nullptr && !time_batch->empty()) { + // Aligned value page: uncompressed layout = uint32(num_values) + bitmap + // + value_buf + if (uncompressed_size < 4) { + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return E_OK; + } + uint32_t num_values = + (static_cast<uint32_t>( + static_cast<unsigned char>(uncompressed_buf[0])) + << 24) | + (static_cast<uint32_t>( + static_cast<unsigned char>(uncompressed_buf[1])) + << 16) | + (static_cast<uint32_t>( + static_cast<unsigned char>(uncompressed_buf[2])) + << 8) | + (static_cast<uint32_t>( + static_cast<unsigned char>(uncompressed_buf[3]))); + uint32_t bitmap_size = (num_values + 7) / 8; + if (uncompressed_size < 4 + bitmap_size) { + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return E_OK; + } + value_buf = uncompressed_buf + 4 + bitmap_size; + value_buf_size = uncompressed_size - 4 - bitmap_size; + times = time_batch; + } else { + // Non-aligned value page: var_uint(time_buf_size) + time_buf + + // value_buf + int var_size = 0; + uint32_t time_buf_size = 0; + ret = common::SerializationUtil::read_var_uint( + time_buf_size, uncompressed_buf, + static_cast<int>(uncompressed_size), &var_size); + if (ret != common::E_OK || + static_cast<uint32_t>(var_size) + time_buf_size > + uncompressed_size) { + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return (ret == common::E_OK) ? common::E_TSFILE_CORRUPTED : ret; + } + const char* time_buf = uncompressed_buf + var_size; + value_buf = time_buf + time_buf_size; + value_buf_size = + uncompressed_size - static_cast<uint32_t>(var_size) - time_buf_size; + Decoder* time_decoder = DecoderFactory::alloc_time_decoder(); + if (time_decoder == nullptr) { + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return common::E_OOM; + } + common::ByteStream time_in; + time_in.wrap_from(const_cast<char*>(time_buf), time_buf_size); + time_decoder->reset(); + time_decode_buf.clear(); + int64_t t; + while (time_decoder->has_remaining(time_in)) { + if (time_decoder->read_int64(t, time_in) != common::E_OK) { + break; + } + time_decode_buf.push_back(t); + } + DecoderFactory::free(time_decoder); + times = &time_decode_buf; + } + + Decoder* value_decoder = DecoderFactory::alloc_value_decoder( + chdr.encoding_type_, chdr.data_type_); + if (value_decoder == nullptr) { + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return common::E_OOM; + } + common::ByteStream value_in; + value_in.wrap_from(const_cast<char*>(value_buf), value_buf_size); + value_decoder->reset(); + size_t idx = 0; + const size_t num_times = times->size(); + while (idx < num_times && value_decoder->has_remaining(value_in)) { + int64_t t = (*times)[idx]; + switch (chdr.data_type_) { + case common::BOOLEAN: { + bool v; + if (value_decoder->read_boolean(v, value_in) == common::E_OK) { + out_stat->update(t, v); + } + break; + } + case common::INT32: + case common::DATE: { + int32_t v; + if (value_decoder->read_int32(v, value_in) == common::E_OK) { + out_stat->update(t, v); + } + break; + } + case common::INT64: + case common::TIMESTAMP: { + int64_t v; + if (value_decoder->read_int64(v, value_in) == common::E_OK) { + out_stat->update(t, v); + } + break; + } + case common::FLOAT: { + float v; + if (value_decoder->read_float(v, value_in) == common::E_OK) { + out_stat->update(t, v); + } + break; + } + case common::DOUBLE: { + double v; + if (value_decoder->read_double(v, value_in) == common::E_OK) { + out_stat->update(t, v); + } + break; + } + case common::TEXT: + case common::BLOB: + case common::STRING: { + common::String v; + if (pa != nullptr && value_decoder->read_String( + v, *pa, value_in) == common::E_OK) { + out_stat->update(t, v); + } + break; + } + default: + break; + } + idx++; + } + DecoderFactory::free(value_decoder); + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return E_OK; +} + +} // namespace + +RestorableTsFileIOWriter::RestorableTsFileIOWriter() + : TsFileIOWriter(), + write_file_(nullptr), + write_file_owned_(false), + truncated_size_(-1), + crashed_(false), + can_write_(false) { + self_check_arena_.init(512, MOD_TSFILE_READER); +} + +RestorableTsFileIOWriter::~RestorableTsFileIOWriter() { close(); } + +void RestorableTsFileIOWriter::close() { + if (write_file_owned_ && write_file_ != nullptr) { + write_file_->close(); + delete write_file_; + write_file_ = nullptr; + write_file_owned_ = false; + } + for (ChunkGroupMeta* cgm : self_check_recovered_cgm_) { + cgm->device_id_.reset(); + } + self_check_recovered_cgm_.clear(); + self_check_arena_.destroy(); +} + +int RestorableTsFileIOWriter::open(const std::string& file_path, + bool truncate_corrupted) { + if (write_file_ != nullptr) { + return E_ALREADY_EXIST; + } + + file_path_ = file_path; + write_file_ = new WriteFile(); + write_file_owned_ = true; + + // O_RDWR|O_CREAT without O_TRUNC: preserve existing file content +#ifdef _WIN32 + const int flags = O_RDWR | O_CREAT | O_BINARY; +#else + const int flags = O_RDWR | O_CREAT; +#endif + const mode_t mode = 0644; + + int ret = write_file_->create(file_path_, flags, mode); + if (ret != E_OK) { + close(); + return ret; + } + + ret = self_check(truncate_corrupted); + if (ret != E_OK) { + close(); + return ret; + } + + return E_OK; +} + +int RestorableTsFileIOWriter::self_check(bool truncate_corrupted) { + SelfCheckReader reader; + // Use a separate read-only handle for self-check: on Windows, sharing the + // O_RDWR fd can cause stale/cached reads when detecting a complete file. + int ret = reader.open(file_path_); + if (ret != E_OK) { + return ret; + } + + int32_t file_size = reader.file_size(); + + // --- Empty file: treat as crashed, allow writing from scratch --- + if (file_size == 0) { + reader.close(); + truncated_size_ = 0; + crashed_ = true; + can_write_ = true; + if (write_file_->seek_to_end() != E_OK) { + return E_FILE_READ_ERR; + } + ret = init(write_file_); + if (ret != E_OK) { + return ret; + } + ret = start_file(); + if (ret != E_OK) { + return ret; + } + return E_OK; + } + + // --- File too short or invalid header => not a valid TsFile --- + if (file_size < HEADER_LEN) { + reader.close(); + truncated_size_ = TSFILE_CHECK_INCOMPATIBLE; + return E_TSFILE_CORRUPTED; + } + + char header_buf[HEADER_LEN]; + int32_t read_len = 0; + ret = reader.read(0, header_buf, HEADER_LEN, read_len); + if (ret != E_OK || read_len != HEADER_LEN) { + reader.close(); + truncated_size_ = TSFILE_CHECK_INCOMPATIBLE; + return E_TSFILE_CORRUPTED; + } + + if (memcmp(header_buf, MAGIC_STRING_TSFILE, MAGIC_STRING_TSFILE_LEN) != 0) { + reader.close(); + truncated_size_ = TSFILE_CHECK_INCOMPATIBLE; + return E_TSFILE_CORRUPTED; + } + + if (header_buf[MAGIC_STRING_TSFILE_LEN] != VERSION_NUM_BYTE) { + reader.close(); + truncated_size_ = TSFILE_CHECK_INCOMPATIBLE; + return E_TSFILE_CORRUPTED; + } + + // --- Completeness check (aligned with Java isComplete()) --- + // Require size >= 2*magic + version_byte and tail magic same as head magic. + bool is_complete = false; + if (file_size >= static_cast<int32_t>(MAGIC_STRING_TSFILE_LEN * 2 + 1)) { + char tail_buf[MAGIC_STRING_TSFILE_LEN]; + ret = reader.read(file_size - MAGIC_STRING_TSFILE_LEN, tail_buf, + MAGIC_STRING_TSFILE_LEN, read_len); + if (ret == E_OK && read_len == MAGIC_STRING_TSFILE_LEN && + memcmp(tail_buf, MAGIC_STRING_TSFILE, MAGIC_STRING_TSFILE_LEN) == + 0) { + is_complete = true; + } + } + + // --- File is complete: no recovery, close write handle and return --- + if (is_complete) { + reader.close(); + truncated_size_ = TSFILE_CHECK_COMPLETE; + crashed_ = false; + can_write_ = false; + write_file_->close(); + delete write_file_; + write_file_ = nullptr; + write_file_owned_ = false; + return E_OK; + } + + // --- Recovery path: scan from header to find last valid truncation point + // --- + int64_t truncated = HEADER_LEN; + int64_t pos = HEADER_LEN; + std::vector<char> buf(BUF_SIZE); + + // Recover schema and chunk group meta (aligned with Java selfCheck). + // cur_group_time_batch: timestamps decoded from time chunk, used by aligned + // value chunks. + std::shared_ptr<IDeviceID> cur_device_id; + ChunkGroupMeta* cur_cgm = nullptr; + std::vector<ChunkGroupMeta*> recovered_cgm_list; + std::vector<int64_t> cur_group_time_batch; + + auto flush_chunk_group = [this, &cur_device_id, &cur_cgm, + &recovered_cgm_list]() { + if (cur_cgm != nullptr && cur_device_id != nullptr) { + get_schema()->update_table_schema(cur_cgm); + recovered_cgm_list.push_back(cur_cgm); + self_check_recovered_cgm_.push_back(cur_cgm); + cur_cgm = nullptr; + } + }; + + while (pos < file_size) { + unsigned char marker; + ret = reader.read(static_cast<int32_t>(pos), + reinterpret_cast<char*>(&marker), 1, read_len); + if (ret != E_OK || read_len != 1) { + break; + } + pos += 1; + + if (marker == static_cast<unsigned char>(SEPARATOR_MARKER)) { + truncated = pos - 1; + flush_chunk_group(); + break; + } + + if (marker == static_cast<unsigned char>(CHUNK_GROUP_HEADER_MARKER)) { + truncated = pos - 1; + flush_chunk_group(); + cur_group_time_batch.clear(); + int seg_len = 0; + ret = reader.read(static_cast<int32_t>(pos), buf.data(), BUF_SIZE, + read_len); + if (ret != E_OK || read_len < 1) { + break; + } + ByteStream bs; + bs.wrap_from(buf.data(), read_len); + cur_device_id = std::make_shared<StringArrayDeviceID>("init"); + ret = cur_device_id->deserialize(bs); + if (ret != E_OK) { + break; + } + seg_len = bs.read_pos(); + pos += seg_len; + cur_cgm = new (self_check_arena_.alloc(sizeof(ChunkGroupMeta))) + ChunkGroupMeta(&self_check_arena_); + cur_cgm->init(cur_device_id); + continue; + } + + if (marker == static_cast<unsigned char>(OPERATION_INDEX_RANGE)) { + truncated = pos - 1; + flush_chunk_group(); + cur_device_id.reset(); + if (pos + 2 * 8 > static_cast<int64_t>(file_size)) { + break; + } + char range_buf[16]; + ret = + reader.read(static_cast<int32_t>(pos), range_buf, 16, read_len); + if (ret != E_OK || read_len != 16) { + break; + } + pos += 16; + truncated = pos; + continue; + } + + if (marker == static_cast<unsigned char>(CHUNK_HEADER_MARKER) || + marker == + static_cast<unsigned char>(ONLY_ONE_PAGE_CHUNK_HEADER_MARKER) || + (marker & 0x3F) == + static_cast<unsigned char>(CHUNK_HEADER_MARKER) || + (marker & 0x3F) == + static_cast<unsigned char>(ONLY_ONE_PAGE_CHUNK_HEADER_MARKER)) { + int64_t chunk_start = pos - 1; + int64_t consumed = 0; + ChunkHeader chdr; + ret = parse_chunk_header_and_skip(reader, chunk_start, consumed, + &chdr); + if (ret != E_OK) { + break; + } + pos = chunk_start + consumed; + truncated = pos; + if (cur_cgm != nullptr) { + void* cm_buf = self_check_arena_.alloc(sizeof(ChunkMeta)); + if (IS_NULL(cm_buf)) { + ret = common::E_OOM; + break; + } + auto* cm = new (cm_buf) ChunkMeta(); + common::String mname; + mname.dup_from(chdr.measurement_name_, self_check_arena_); + Statistic* stat = StatisticFactory::alloc_statistic_with_pa( + static_cast<common::TSDataType>(chdr.data_type_), + &self_check_arena_); + if (IS_NULL(stat)) { + ret = common::E_OOM; + break; + } + stat->reset(); + if (chdr.data_size_ > 0) { + const int32_t header_len = + static_cast<int32_t>(consumed) - chdr.data_size_; + if (header_len > 0 && chunk_start + consumed <= + static_cast<int64_t>(file_size)) { + std::vector<char> chunk_data(chdr.data_size_); + int32_t read_len = 0; + ret = reader.read( + static_cast<int32_t>(chunk_start + header_len), + chunk_data.data(), chdr.data_size_, read_len); + if (ret == E_OK && read_len == chdr.data_size_) { + ret = recover_chunk_statistic( + chdr, chunk_data.data(), chdr.data_size_, stat, + &self_check_arena_, &cur_group_time_batch, + &cur_group_time_batch); + } + if (ret != E_OK) { + break; + } + } + } + cm->init(mname, + static_cast<common::TSDataType>(chdr.data_type_), + chunk_start, stat, 0, + static_cast<common::TSEncoding>(chdr.encoding_type_), + static_cast<common::CompressionType>( + chdr.compression_type_), + self_check_arena_); + cur_cgm->push(cm); + if (cur_device_id != nullptr && + (static_cast<unsigned char>(chdr.chunk_type_) & 0x80) != + 0) { + aligned_devices_.insert(cur_device_id->get_table_name()); Review Comment: Explain this ########## cpp/src/file/restorable_tsfile_io_writer.cc: ########## @@ -0,0 +1,868 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "file/restorable_tsfile_io_writer.h" + +#include <fcntl.h> + +#include <algorithm> +#include <cstring> +#include <memory> +#include <vector> + +#include "common/allocator/byte_stream.h" +#include "common/device_id.h" +#include "common/statistic.h" +#include "common/tsfile_common.h" +#include "compress/compressor_factory.h" +#include "encoding/decoder_factory.h" +#include "utils/errno_define.h" + +#ifdef _WIN32 +#include <io.h> +#include <sys/stat.h> +#include <windows.h> +ssize_t pread(int fd, void* buf, size_t count, uint64_t offset); +#else +#include <sys/stat.h> +#include <unistd.h> +#endif + +using namespace common; + +namespace storage { + +namespace { + +const int HEADER_LEN = MAGIC_STRING_TSFILE_LEN + 1; // magic + version +const int BUF_SIZE = 4096; + +// ----------------------------------------------------------------------------- +// Self-check helpers: read file, parse chunk header, recover chunk statistics +// ----------------------------------------------------------------------------- + +/** + * Lightweight read-only file handle for self-check only. + * Use init_from_fd() when WriteFile is already open to avoid opening the file + * twice (fixes Windows file sharing and ensures we read the same content). + */ +struct SelfCheckReader { + int fd_; + int32_t file_size_; + bool own_fd_; // if false, do not close fd_ + + SelfCheckReader() : fd_(-1), file_size_(-1), own_fd_(true) {} + + int init_from_fd(int fd) { + fd_ = fd; + own_fd_ = false; + if (fd_ < 0) { + return E_FILE_OPEN_ERR; + } +#ifdef _WIN32 + struct _stat st; + if (_fstat(fd_, &st) < 0) { + return E_FILE_STAT_ERR; + } + file_size_ = static_cast<int32_t>(st.st_size); +#else + struct stat st; + if (fstat(fd_, &st) < 0) { + return E_FILE_STAT_ERR; + } + file_size_ = static_cast<int32_t>(st.st_size); +#endif + return E_OK; + } + + int open(const std::string& path) { +#ifdef _WIN32 + fd_ = ::_open(path.c_str(), _O_RDONLY | _O_BINARY); +#else + fd_ = ::open(path.c_str(), O_RDONLY); +#endif + if (fd_ < 0) { + return E_FILE_OPEN_ERR; + } + own_fd_ = true; +#ifdef _WIN32 + struct _stat st; + if (_fstat(fd_, &st) < 0) { + close(); + return E_FILE_STAT_ERR; + } + file_size_ = static_cast<int32_t>(st.st_size); +#else + struct stat st; + if (fstat(fd_, &st) < 0) { + close(); + return E_FILE_STAT_ERR; + } + file_size_ = static_cast<int32_t>(st.st_size); +#endif + return E_OK; + } + + void close() { + if (own_fd_ && fd_ >= 0) { +#ifdef _WIN32 + ::_close(fd_); +#else + ::close(fd_); +#endif + } + fd_ = -1; + file_size_ = -1; + } + + int32_t file_size() const { return file_size_; } + + int read(int32_t offset, char* buf, int32_t buf_size, int32_t& read_len) { + read_len = 0; + if (fd_ < 0) { + return E_FILE_READ_ERR; + } + ssize_t n = ::pread(fd_, buf, buf_size, offset); + if (n < 0) { + return E_FILE_READ_ERR; + } + read_len = static_cast<int32_t>(n); + return E_OK; + } +}; + +#ifdef _WIN32 +ssize_t pread(int fd, void* buf, size_t count, uint64_t offset) { + DWORD read_bytes = 0; + OVERLAPPED ov = {}; + ov.OffsetHigh = (DWORD)((offset >> 32) & 0xFFFFFFFF); + ov.Offset = (DWORD)(offset & 0xFFFFFFFF); + HANDLE h = (HANDLE)_get_osfhandle(fd); + if (!ReadFile(h, buf, (DWORD)count, &read_bytes, &ov)) { + if (GetLastError() != ERROR_HANDLE_EOF) { + return -1; + } + } + return (ssize_t)read_bytes; +} +#endif + +/** + * Parse chunk header at chunk_start and compute total chunk size (header + + * data). Does not read full chunk data; used to advance scan position. + * @param header_out If non-null, filled with the deserialized chunk header. + * @param bytes_consumed Set to header_len + data_size on success. + */ +static int parse_chunk_header_and_skip(SelfCheckReader& reader, + int64_t chunk_start, + int64_t& bytes_consumed, + ChunkHeader* header_out = nullptr) { + int32_t file_size = reader.file_size(); + int32_t max_read = static_cast<int32_t>( + std::min(static_cast<int64_t>(BUF_SIZE), file_size - chunk_start)); + if (max_read < ChunkHeader::MIN_SERIALIZED_SIZE) { + return E_TSFILE_CORRUPTED; + } + + std::vector<char> buf(max_read); + int32_t read_len = 0; + int ret = reader.read(static_cast<int32_t>(chunk_start), buf.data(), + max_read, read_len); + if (ret != E_OK || read_len < ChunkHeader::MIN_SERIALIZED_SIZE) { + return E_TSFILE_CORRUPTED; + } + + ByteStream bs; + bs.wrap_from(buf.data(), read_len); + + ChunkHeader header; + ret = header.deserialize_from(bs); + if (ret != E_OK) { + return E_TSFILE_CORRUPTED; + } + + int header_len = bs.read_pos(); + int64_t total = header_len + header.data_size_; + if (chunk_start + total > file_size) { + return E_TSFILE_CORRUPTED; + } + + if (header_out != nullptr) { + *header_out = header; + } + bytes_consumed = total; + return E_OK; +} + +/** + * Recover chunk-level statistic from chunk data so that tail metadata can be + * generated correctly after recovery (aligned with Java TsFileSequenceReader + * selfCheck). Multi-page: merge each page header's statistic. Single-page: + * decode page data and update stat. For aligned value chunks, time_batch + * (from the time chunk in the same group) must be provided. + */ +static int recover_chunk_statistic( + const ChunkHeader& chdr, const char* chunk_data, int32_t data_size, + Statistic* out_stat, common::PageArena* pa, + const std::vector<int64_t>* time_batch = nullptr, + std::vector<int64_t>* out_time_batch = nullptr) { + if (chunk_data == nullptr || data_size <= 0 || out_stat == nullptr) { + return E_OK; + } + common::ByteStream bs; + bs.wrap_from(const_cast<char*>(chunk_data), + static_cast<uint32_t>(data_size)); + // Multi-page chunk: high bits of chunk_type_ are 0x00, low 6 bits = + // CHUNK_HEADER_MARKER + const bool multi_page = + (static_cast<unsigned char>(chdr.chunk_type_) & 0x3F) == + static_cast<unsigned char>(CHUNK_HEADER_MARKER); + + if (multi_page) { + while (bs.remaining_size() > 0) { + PageHeader ph; + int ret = ph.deserialize_from(bs, true, chdr.data_type_); + if (ret != common::E_OK) { + return ret; + } + uint32_t comp = ph.compressed_size_; + if (ph.statistic_ != nullptr) { + if (out_stat->merge_with(ph.statistic_) != common::E_OK) { + ph.reset(); + return common::E_TSFILE_CORRUPTED; + } + } + ph.reset(); + bs.wrapped_buf_advance_read_pos(comp); + } + return E_OK; + } + + // Single-page chunk: statistic is not in page header; decompress and decode + // to fill out_stat. is_time_column: bit 0x80 in chunk_type_ indicates time + // column (aligned model). + const bool is_time_column = + (static_cast<unsigned char>(chdr.chunk_type_) & 0x80) != 0; + PageHeader ph; + int ret = ph.deserialize_from(bs, false, chdr.data_type_); + if (ret != common::E_OK || ph.compressed_size_ == 0 || + bs.remaining_size() < ph.compressed_size_) { + return E_OK; + } Review Comment: Why return E_OK? ########## cpp/src/writer/tsfile_writer.cc: ########## @@ -113,11 +115,50 @@ int TsFileWriter::init(WriteFile* write_file) { } write_file_ = write_file; write_file_created_ = false; + io_writer_owned_ = true; io_writer_ = new TsFileIOWriter(); io_writer_->init(write_file_); return E_OK; } +int TsFileWriter::init(RestorableTsFileIOWriter* rw) { + // Initialize from a recovered writer: take schema from file, do not own + // io_writer_ + if (rw == nullptr) { + return E_INVALID_ARG; + } + if (!rw->can_write()) { + return E_INVALID_ARG; + } + write_file_ = rw->get_write_file(); + write_file_created_ = false; + io_writer_owned_ = false; + io_writer_ = rw; // RestorableTsFileIOWriter is also a TsFileIOWriter + std::shared_ptr<Schema> known = rw->get_known_schema(); + for (const auto& kv : known->table_schema_map_) { + const std::string& table_name = kv.first; + const std::shared_ptr<TableSchema>& ts = kv.second; + if (!ts || ts->get_measurement_names().empty()) { + continue; + } + auto device_id = std::make_shared<StringArrayDeviceID>(table_name); + auto* ms_group = new MeasurementSchemaGroup; Review Comment: Explain this ########## cpp/src/file/restorable_tsfile_io_writer.cc: ########## @@ -0,0 +1,868 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "file/restorable_tsfile_io_writer.h" + +#include <fcntl.h> + +#include <algorithm> +#include <cstring> +#include <memory> +#include <vector> + +#include "common/allocator/byte_stream.h" +#include "common/device_id.h" +#include "common/statistic.h" +#include "common/tsfile_common.h" +#include "compress/compressor_factory.h" +#include "encoding/decoder_factory.h" +#include "utils/errno_define.h" + +#ifdef _WIN32 +#include <io.h> +#include <sys/stat.h> +#include <windows.h> +ssize_t pread(int fd, void* buf, size_t count, uint64_t offset); +#else +#include <sys/stat.h> +#include <unistd.h> +#endif + +using namespace common; + +namespace storage { + +namespace { + +const int HEADER_LEN = MAGIC_STRING_TSFILE_LEN + 1; // magic + version +const int BUF_SIZE = 4096; + +// ----------------------------------------------------------------------------- +// Self-check helpers: read file, parse chunk header, recover chunk statistics +// ----------------------------------------------------------------------------- + +/** + * Lightweight read-only file handle for self-check only. + * Use init_from_fd() when WriteFile is already open to avoid opening the file + * twice (fixes Windows file sharing and ensures we read the same content). + */ +struct SelfCheckReader { + int fd_; + int32_t file_size_; + bool own_fd_; // if false, do not close fd_ + + SelfCheckReader() : fd_(-1), file_size_(-1), own_fd_(true) {} + + int init_from_fd(int fd) { + fd_ = fd; + own_fd_ = false; + if (fd_ < 0) { + return E_FILE_OPEN_ERR; + } +#ifdef _WIN32 + struct _stat st; + if (_fstat(fd_, &st) < 0) { + return E_FILE_STAT_ERR; + } + file_size_ = static_cast<int32_t>(st.st_size); +#else + struct stat st; + if (fstat(fd_, &st) < 0) { + return E_FILE_STAT_ERR; + } + file_size_ = static_cast<int32_t>(st.st_size); +#endif + return E_OK; + } + + int open(const std::string& path) { +#ifdef _WIN32 + fd_ = ::_open(path.c_str(), _O_RDONLY | _O_BINARY); +#else + fd_ = ::open(path.c_str(), O_RDONLY); +#endif + if (fd_ < 0) { + return E_FILE_OPEN_ERR; + } + own_fd_ = true; +#ifdef _WIN32 + struct _stat st; + if (_fstat(fd_, &st) < 0) { + close(); + return E_FILE_STAT_ERR; + } + file_size_ = static_cast<int32_t>(st.st_size); +#else + struct stat st; + if (fstat(fd_, &st) < 0) { + close(); + return E_FILE_STAT_ERR; + } + file_size_ = static_cast<int32_t>(st.st_size); +#endif + return E_OK; + } + + void close() { + if (own_fd_ && fd_ >= 0) { +#ifdef _WIN32 + ::_close(fd_); +#else + ::close(fd_); +#endif + } + fd_ = -1; + file_size_ = -1; + } + + int32_t file_size() const { return file_size_; } + + int read(int32_t offset, char* buf, int32_t buf_size, int32_t& read_len) { + read_len = 0; + if (fd_ < 0) { + return E_FILE_READ_ERR; + } + ssize_t n = ::pread(fd_, buf, buf_size, offset); + if (n < 0) { + return E_FILE_READ_ERR; + } + read_len = static_cast<int32_t>(n); + return E_OK; + } +}; + +#ifdef _WIN32 +ssize_t pread(int fd, void* buf, size_t count, uint64_t offset) { + DWORD read_bytes = 0; + OVERLAPPED ov = {}; + ov.OffsetHigh = (DWORD)((offset >> 32) & 0xFFFFFFFF); + ov.Offset = (DWORD)(offset & 0xFFFFFFFF); + HANDLE h = (HANDLE)_get_osfhandle(fd); + if (!ReadFile(h, buf, (DWORD)count, &read_bytes, &ov)) { + if (GetLastError() != ERROR_HANDLE_EOF) { + return -1; + } + } + return (ssize_t)read_bytes; +} +#endif + +/** + * Parse chunk header at chunk_start and compute total chunk size (header + + * data). Does not read full chunk data; used to advance scan position. + * @param header_out If non-null, filled with the deserialized chunk header. + * @param bytes_consumed Set to header_len + data_size on success. + */ +static int parse_chunk_header_and_skip(SelfCheckReader& reader, + int64_t chunk_start, + int64_t& bytes_consumed, + ChunkHeader* header_out = nullptr) { + int32_t file_size = reader.file_size(); + int32_t max_read = static_cast<int32_t>( + std::min(static_cast<int64_t>(BUF_SIZE), file_size - chunk_start)); + if (max_read < ChunkHeader::MIN_SERIALIZED_SIZE) { + return E_TSFILE_CORRUPTED; + } + + std::vector<char> buf(max_read); + int32_t read_len = 0; + int ret = reader.read(static_cast<int32_t>(chunk_start), buf.data(), + max_read, read_len); + if (ret != E_OK || read_len < ChunkHeader::MIN_SERIALIZED_SIZE) { + return E_TSFILE_CORRUPTED; + } + + ByteStream bs; + bs.wrap_from(buf.data(), read_len); + + ChunkHeader header; + ret = header.deserialize_from(bs); + if (ret != E_OK) { + return E_TSFILE_CORRUPTED; + } + + int header_len = bs.read_pos(); + int64_t total = header_len + header.data_size_; + if (chunk_start + total > file_size) { + return E_TSFILE_CORRUPTED; + } + + if (header_out != nullptr) { + *header_out = header; + } + bytes_consumed = total; + return E_OK; +} + +/** + * Recover chunk-level statistic from chunk data so that tail metadata can be + * generated correctly after recovery (aligned with Java TsFileSequenceReader + * selfCheck). Multi-page: merge each page header's statistic. Single-page: + * decode page data and update stat. For aligned value chunks, time_batch + * (from the time chunk in the same group) must be provided. + */ +static int recover_chunk_statistic( + const ChunkHeader& chdr, const char* chunk_data, int32_t data_size, + Statistic* out_stat, common::PageArena* pa, + const std::vector<int64_t>* time_batch = nullptr, + std::vector<int64_t>* out_time_batch = nullptr) { + if (chunk_data == nullptr || data_size <= 0 || out_stat == nullptr) { + return E_OK; + } + common::ByteStream bs; + bs.wrap_from(const_cast<char*>(chunk_data), + static_cast<uint32_t>(data_size)); + // Multi-page chunk: high bits of chunk_type_ are 0x00, low 6 bits = + // CHUNK_HEADER_MARKER + const bool multi_page = + (static_cast<unsigned char>(chdr.chunk_type_) & 0x3F) == + static_cast<unsigned char>(CHUNK_HEADER_MARKER); + + if (multi_page) { + while (bs.remaining_size() > 0) { + PageHeader ph; + int ret = ph.deserialize_from(bs, true, chdr.data_type_); + if (ret != common::E_OK) { + return ret; + } + uint32_t comp = ph.compressed_size_; + if (ph.statistic_ != nullptr) { + if (out_stat->merge_with(ph.statistic_) != common::E_OK) { + ph.reset(); + return common::E_TSFILE_CORRUPTED; + } + } + ph.reset(); + bs.wrapped_buf_advance_read_pos(comp); + } + return E_OK; + } + + // Single-page chunk: statistic is not in page header; decompress and decode + // to fill out_stat. is_time_column: bit 0x80 in chunk_type_ indicates time + // column (aligned model). + const bool is_time_column = + (static_cast<unsigned char>(chdr.chunk_type_) & 0x80) != 0; + PageHeader ph; + int ret = ph.deserialize_from(bs, false, chdr.data_type_); + if (ret != common::E_OK || ph.compressed_size_ == 0 || + bs.remaining_size() < ph.compressed_size_) { + return E_OK; + } + const char* compressed_ptr = + chunk_data + (data_size - static_cast<int32_t>(bs.remaining_size())); + char* uncompressed_buf = nullptr; + uint32_t uncompressed_size = 0; + Compressor* compressor = + CompressorFactory::alloc_compressor(chdr.compression_type_); + if (compressor == nullptr) { + return common::E_OOM; + } + ret = compressor->reset(false); + if (ret != common::E_OK) { + CompressorFactory::free(compressor); + return ret; + } + ret = compressor->uncompress(const_cast<char*>(compressed_ptr), + ph.compressed_size_, uncompressed_buf, + uncompressed_size); + if (ret != common::E_OK || uncompressed_buf == nullptr || + uncompressed_size != ph.uncompressed_size_) { + if (uncompressed_buf != nullptr) { + compressor->after_uncompress(uncompressed_buf); + } + CompressorFactory::free(compressor); + return (ret == common::E_OK) ? common::E_TSFILE_CORRUPTED : ret; + } + if (is_time_column) { + /* Time chunk: uncompressed = raw time stream only (no var_uint). */ + Decoder* time_decoder = DecoderFactory::alloc_time_decoder(); + if (time_decoder == nullptr) { + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return common::E_OOM; + } + common::ByteStream time_in; + time_in.wrap_from(uncompressed_buf, uncompressed_size); + time_decoder->reset(); + int64_t t; + if (out_time_batch != nullptr) { + out_time_batch->clear(); + } + while (time_decoder->has_remaining(time_in)) { + if (time_decoder->read_int64(t, time_in) != common::E_OK) { + break; + } + out_stat->update(t); + if (out_time_batch != nullptr) { + out_time_batch->push_back(t); + } + } + DecoderFactory::free(time_decoder); + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return E_OK; + } + + /* Value chunk: parse layout and decode. */ + const char* value_buf = nullptr; + uint32_t value_buf_size = 0; + std::vector<int64_t> time_decode_buf; + const std::vector<int64_t>* times = nullptr; + + if (time_batch != nullptr && !time_batch->empty()) { + // Aligned value page: uncompressed layout = uint32(num_values) + bitmap + // + value_buf + if (uncompressed_size < 4) { + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return E_OK; + } + uint32_t num_values = + (static_cast<uint32_t>( + static_cast<unsigned char>(uncompressed_buf[0])) + << 24) | + (static_cast<uint32_t>( + static_cast<unsigned char>(uncompressed_buf[1])) + << 16) | + (static_cast<uint32_t>( + static_cast<unsigned char>(uncompressed_buf[2])) + << 8) | + (static_cast<uint32_t>( + static_cast<unsigned char>(uncompressed_buf[3]))); + uint32_t bitmap_size = (num_values + 7) / 8; + if (uncompressed_size < 4 + bitmap_size) { + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return E_OK; + } + value_buf = uncompressed_buf + 4 + bitmap_size; + value_buf_size = uncompressed_size - 4 - bitmap_size; + times = time_batch; + } else { + // Non-aligned value page: var_uint(time_buf_size) + time_buf + + // value_buf + int var_size = 0; + uint32_t time_buf_size = 0; + ret = common::SerializationUtil::read_var_uint( + time_buf_size, uncompressed_buf, + static_cast<int>(uncompressed_size), &var_size); + if (ret != common::E_OK || + static_cast<uint32_t>(var_size) + time_buf_size > + uncompressed_size) { + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return (ret == common::E_OK) ? common::E_TSFILE_CORRUPTED : ret; + } + const char* time_buf = uncompressed_buf + var_size; + value_buf = time_buf + time_buf_size; + value_buf_size = + uncompressed_size - static_cast<uint32_t>(var_size) - time_buf_size; + Decoder* time_decoder = DecoderFactory::alloc_time_decoder(); + if (time_decoder == nullptr) { + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return common::E_OOM; + } + common::ByteStream time_in; + time_in.wrap_from(const_cast<char*>(time_buf), time_buf_size); + time_decoder->reset(); + time_decode_buf.clear(); + int64_t t; + while (time_decoder->has_remaining(time_in)) { + if (time_decoder->read_int64(t, time_in) != common::E_OK) { + break; + } + time_decode_buf.push_back(t); + } + DecoderFactory::free(time_decoder); + times = &time_decode_buf; + } + + Decoder* value_decoder = DecoderFactory::alloc_value_decoder( + chdr.encoding_type_, chdr.data_type_); + if (value_decoder == nullptr) { + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return common::E_OOM; + } + common::ByteStream value_in; + value_in.wrap_from(const_cast<char*>(value_buf), value_buf_size); + value_decoder->reset(); + size_t idx = 0; + const size_t num_times = times->size(); + while (idx < num_times && value_decoder->has_remaining(value_in)) { + int64_t t = (*times)[idx]; + switch (chdr.data_type_) { + case common::BOOLEAN: { + bool v; + if (value_decoder->read_boolean(v, value_in) == common::E_OK) { + out_stat->update(t, v); + } + break; + } + case common::INT32: + case common::DATE: { + int32_t v; + if (value_decoder->read_int32(v, value_in) == common::E_OK) { + out_stat->update(t, v); + } + break; + } + case common::INT64: + case common::TIMESTAMP: { + int64_t v; + if (value_decoder->read_int64(v, value_in) == common::E_OK) { + out_stat->update(t, v); + } + break; + } + case common::FLOAT: { + float v; + if (value_decoder->read_float(v, value_in) == common::E_OK) { + out_stat->update(t, v); + } + break; + } + case common::DOUBLE: { + double v; + if (value_decoder->read_double(v, value_in) == common::E_OK) { + out_stat->update(t, v); + } + break; + } + case common::TEXT: + case common::BLOB: + case common::STRING: { + common::String v; + if (pa != nullptr && value_decoder->read_String( + v, *pa, value_in) == common::E_OK) { + out_stat->update(t, v); + } + break; + } + default: + break; + } + idx++; + } + DecoderFactory::free(value_decoder); + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return E_OK; +} + +} // namespace + +RestorableTsFileIOWriter::RestorableTsFileIOWriter() + : TsFileIOWriter(), + write_file_(nullptr), + write_file_owned_(false), + truncated_size_(-1), + crashed_(false), + can_write_(false) { + self_check_arena_.init(512, MOD_TSFILE_READER); +} + +RestorableTsFileIOWriter::~RestorableTsFileIOWriter() { close(); } + +void RestorableTsFileIOWriter::close() { + if (write_file_owned_ && write_file_ != nullptr) { + write_file_->close(); + delete write_file_; + write_file_ = nullptr; + write_file_owned_ = false; + } + for (ChunkGroupMeta* cgm : self_check_recovered_cgm_) { + cgm->device_id_.reset(); + } + self_check_recovered_cgm_.clear(); + self_check_arena_.destroy(); +} + +int RestorableTsFileIOWriter::open(const std::string& file_path, + bool truncate_corrupted) { + if (write_file_ != nullptr) { + return E_ALREADY_EXIST; + } + + file_path_ = file_path; + write_file_ = new WriteFile(); + write_file_owned_ = true; + + // O_RDWR|O_CREAT without O_TRUNC: preserve existing file content +#ifdef _WIN32 + const int flags = O_RDWR | O_CREAT | O_BINARY; +#else + const int flags = O_RDWR | O_CREAT; +#endif + const mode_t mode = 0644; + + int ret = write_file_->create(file_path_, flags, mode); + if (ret != E_OK) { + close(); + return ret; + } + + ret = self_check(truncate_corrupted); + if (ret != E_OK) { + close(); + return ret; + } + + return E_OK; +} + +int RestorableTsFileIOWriter::self_check(bool truncate_corrupted) { + SelfCheckReader reader; + // Use a separate read-only handle for self-check: on Windows, sharing the + // O_RDWR fd can cause stale/cached reads when detecting a complete file. + int ret = reader.open(file_path_); + if (ret != E_OK) { + return ret; + } + + int32_t file_size = reader.file_size(); + + // --- Empty file: treat as crashed, allow writing from scratch --- + if (file_size == 0) { + reader.close(); + truncated_size_ = 0; + crashed_ = true; + can_write_ = true; + if (write_file_->seek_to_end() != E_OK) { + return E_FILE_READ_ERR; + } + ret = init(write_file_); + if (ret != E_OK) { + return ret; + } + ret = start_file(); + if (ret != E_OK) { + return ret; + } + return E_OK; + } + + // --- File too short or invalid header => not a valid TsFile --- + if (file_size < HEADER_LEN) { + reader.close(); + truncated_size_ = TSFILE_CHECK_INCOMPATIBLE; + return E_TSFILE_CORRUPTED; + } + + char header_buf[HEADER_LEN]; + int32_t read_len = 0; + ret = reader.read(0, header_buf, HEADER_LEN, read_len); + if (ret != E_OK || read_len != HEADER_LEN) { + reader.close(); + truncated_size_ = TSFILE_CHECK_INCOMPATIBLE; + return E_TSFILE_CORRUPTED; + } + + if (memcmp(header_buf, MAGIC_STRING_TSFILE, MAGIC_STRING_TSFILE_LEN) != 0) { + reader.close(); + truncated_size_ = TSFILE_CHECK_INCOMPATIBLE; + return E_TSFILE_CORRUPTED; + } + + if (header_buf[MAGIC_STRING_TSFILE_LEN] != VERSION_NUM_BYTE) { + reader.close(); + truncated_size_ = TSFILE_CHECK_INCOMPATIBLE; + return E_TSFILE_CORRUPTED; + } + + // --- Completeness check (aligned with Java isComplete()) --- + // Require size >= 2*magic + version_byte and tail magic same as head magic. + bool is_complete = false; + if (file_size >= static_cast<int32_t>(MAGIC_STRING_TSFILE_LEN * 2 + 1)) { + char tail_buf[MAGIC_STRING_TSFILE_LEN]; + ret = reader.read(file_size - MAGIC_STRING_TSFILE_LEN, tail_buf, + MAGIC_STRING_TSFILE_LEN, read_len); + if (ret == E_OK && read_len == MAGIC_STRING_TSFILE_LEN && + memcmp(tail_buf, MAGIC_STRING_TSFILE, MAGIC_STRING_TSFILE_LEN) == + 0) { + is_complete = true; + } + } + + // --- File is complete: no recovery, close write handle and return --- + if (is_complete) { + reader.close(); + truncated_size_ = TSFILE_CHECK_COMPLETE; + crashed_ = false; + can_write_ = false; + write_file_->close(); + delete write_file_; + write_file_ = nullptr; + write_file_owned_ = false; + return E_OK; + } + + // --- Recovery path: scan from header to find last valid truncation point + // --- + int64_t truncated = HEADER_LEN; + int64_t pos = HEADER_LEN; + std::vector<char> buf(BUF_SIZE); + + // Recover schema and chunk group meta (aligned with Java selfCheck). + // cur_group_time_batch: timestamps decoded from time chunk, used by aligned + // value chunks. + std::shared_ptr<IDeviceID> cur_device_id; + ChunkGroupMeta* cur_cgm = nullptr; + std::vector<ChunkGroupMeta*> recovered_cgm_list; + std::vector<int64_t> cur_group_time_batch; + + auto flush_chunk_group = [this, &cur_device_id, &cur_cgm, + &recovered_cgm_list]() { + if (cur_cgm != nullptr && cur_device_id != nullptr) { + get_schema()->update_table_schema(cur_cgm); + recovered_cgm_list.push_back(cur_cgm); + self_check_recovered_cgm_.push_back(cur_cgm); + cur_cgm = nullptr; + } + }; + + while (pos < file_size) { + unsigned char marker; + ret = reader.read(static_cast<int32_t>(pos), + reinterpret_cast<char*>(&marker), 1, read_len); + if (ret != E_OK || read_len != 1) { + break; + } + pos += 1; + + if (marker == static_cast<unsigned char>(SEPARATOR_MARKER)) { + truncated = pos - 1; + flush_chunk_group(); + break; + } + + if (marker == static_cast<unsigned char>(CHUNK_GROUP_HEADER_MARKER)) { + truncated = pos - 1; + flush_chunk_group(); + cur_group_time_batch.clear(); + int seg_len = 0; + ret = reader.read(static_cast<int32_t>(pos), buf.data(), BUF_SIZE, + read_len); + if (ret != E_OK || read_len < 1) { + break; + } + ByteStream bs; + bs.wrap_from(buf.data(), read_len); + cur_device_id = std::make_shared<StringArrayDeviceID>("init"); + ret = cur_device_id->deserialize(bs); + if (ret != E_OK) { + break; + } + seg_len = bs.read_pos(); + pos += seg_len; + cur_cgm = new (self_check_arena_.alloc(sizeof(ChunkGroupMeta))) + ChunkGroupMeta(&self_check_arena_); + cur_cgm->init(cur_device_id); + continue; + } + + if (marker == static_cast<unsigned char>(OPERATION_INDEX_RANGE)) { + truncated = pos - 1; + flush_chunk_group(); + cur_device_id.reset(); + if (pos + 2 * 8 > static_cast<int64_t>(file_size)) { + break; + } + char range_buf[16]; + ret = + reader.read(static_cast<int32_t>(pos), range_buf, 16, read_len); + if (ret != E_OK || read_len != 16) { + break; + } + pos += 16; + truncated = pos; + continue; + } + + if (marker == static_cast<unsigned char>(CHUNK_HEADER_MARKER) || + marker == + static_cast<unsigned char>(ONLY_ONE_PAGE_CHUNK_HEADER_MARKER) || + (marker & 0x3F) == + static_cast<unsigned char>(CHUNK_HEADER_MARKER) || + (marker & 0x3F) == + static_cast<unsigned char>(ONLY_ONE_PAGE_CHUNK_HEADER_MARKER)) { + int64_t chunk_start = pos - 1; + int64_t consumed = 0; + ChunkHeader chdr; + ret = parse_chunk_header_and_skip(reader, chunk_start, consumed, + &chdr); + if (ret != E_OK) { + break; + } + pos = chunk_start + consumed; + truncated = pos; + if (cur_cgm != nullptr) { + void* cm_buf = self_check_arena_.alloc(sizeof(ChunkMeta)); + if (IS_NULL(cm_buf)) { + ret = common::E_OOM; + break; + } + auto* cm = new (cm_buf) ChunkMeta(); + common::String mname; + mname.dup_from(chdr.measurement_name_, self_check_arena_); + Statistic* stat = StatisticFactory::alloc_statistic_with_pa( + static_cast<common::TSDataType>(chdr.data_type_), + &self_check_arena_); + if (IS_NULL(stat)) { + ret = common::E_OOM; + break; + } + stat->reset(); + if (chdr.data_size_ > 0) { + const int32_t header_len = + static_cast<int32_t>(consumed) - chdr.data_size_; + if (header_len > 0 && chunk_start + consumed <= + static_cast<int64_t>(file_size)) { + std::vector<char> chunk_data(chdr.data_size_); + int32_t read_len = 0; + ret = reader.read( + static_cast<int32_t>(chunk_start + header_len), + chunk_data.data(), chdr.data_size_, read_len); + if (ret == E_OK && read_len == chdr.data_size_) { + ret = recover_chunk_statistic( + chdr, chunk_data.data(), chdr.data_size_, stat, + &self_check_arena_, &cur_group_time_batch, + &cur_group_time_batch); + } + if (ret != E_OK) { + break; + } + } + } + cm->init(mname, + static_cast<common::TSDataType>(chdr.data_type_), + chunk_start, stat, 0, + static_cast<common::TSEncoding>(chdr.encoding_type_), + static_cast<common::CompressionType>( + chdr.compression_type_), + self_check_arena_); + cur_cgm->push(cm); + if (cur_device_id != nullptr && + (static_cast<unsigned char>(chdr.chunk_type_) & 0x80) != + 0) { + aligned_devices_.insert(cur_device_id->get_table_name()); + } + } + continue; + } + + truncated_size_ = TSFILE_CHECK_INCOMPATIBLE; + flush_chunk_group(); + reader.close(); + return E_TSFILE_CORRUPTED; + } + + flush_chunk_group(); + get_schema()->finalize_table_schemas(); + reader.close(); + truncated_size_ = truncated; + + // --- Optionally truncate file to last valid offset --- + if (truncate_corrupted && truncated < static_cast<int64_t>(file_size)) { + ret = write_file_->truncate(truncated); + if (ret != E_OK) { + return ret; + } + } + + if (write_file_->seek_to_end() != E_OK) { + return E_FILE_READ_ERR; + } + + crashed_ = true; + can_write_ = true; + + ret = init(write_file_); + if (ret != E_OK) { + return ret; + } + + // --- Restore write_stream_ from file content (recovery only) --- + // So that cur_file_position() is correct when generating tail metadata + // later. flush_stream_to_file() will skip these leading bytes + // (set_flush_skip_leading). + const int64_t restored_size = write_file_->get_position(); + if (restored_size > 0) { + const int read_chunk = 65536; + std::vector<char> read_buf(read_chunk); + int64_t offset = 0; + while (offset < restored_size) { + int64_t to_read = std::min(static_cast<int64_t>(read_chunk), + restored_size - offset); + ssize_t nr = -1; +#ifdef _WIN32 + nr = pread(write_file_->get_fd(), read_buf.data(), + static_cast<size_t>(to_read), + static_cast<uint64_t>(offset)); +#else + nr = ::pread(write_file_->get_fd(), read_buf.data(), + static_cast<size_t>(to_read), offset); +#endif + if (nr <= 0) { + ret = E_FILE_READ_ERR; + break; + } + if (write_buf(read_buf.data(), static_cast<uint32_t>(nr)) != E_OK) { + ret = E_FILE_WRITE_ERR; + break; + } + offset += nr; + } + if (ret == E_OK) { + set_flush_skip_leading(restored_size); + } else { + return ret; + } + } Review Comment: This seems very expensive. Is not there a simpler way? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
