hongzhi-gao commented on code in PR #731:
URL: https://github.com/apache/tsfile/pull/731#discussion_r2876708643


##########
cpp/src/file/restorable_tsfile_io_writer.cc:
##########
@@ -0,0 +1,868 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "file/restorable_tsfile_io_writer.h"
+
+#include <fcntl.h>
+
+#include <algorithm>
+#include <cstring>
+#include <memory>
+#include <vector>
+
+#include "common/allocator/byte_stream.h"
+#include "common/device_id.h"
+#include "common/statistic.h"
+#include "common/tsfile_common.h"
+#include "compress/compressor_factory.h"
+#include "encoding/decoder_factory.h"
+#include "utils/errno_define.h"
+
+#ifdef _WIN32
+#include <io.h>
+#include <sys/stat.h>
+#include <windows.h>
+ssize_t pread(int fd, void* buf, size_t count, uint64_t offset);
+#else
+#include <sys/stat.h>
+#include <unistd.h>
+#endif
+
+using namespace common;
+
+namespace storage {
+
+namespace {
+
+const int HEADER_LEN = MAGIC_STRING_TSFILE_LEN + 1;  // magic + version
+const int BUF_SIZE = 4096;
+
+// 
-----------------------------------------------------------------------------
+// Self-check helpers: read file, parse chunk header, recover chunk statistics
+// 
-----------------------------------------------------------------------------
+
+/**
+ * Lightweight read-only file handle for self-check only.
+ * Use init_from_fd() when WriteFile is already open to avoid opening the file
+ * twice (fixes Windows file sharing and ensures we read the same content).
+ */
+struct SelfCheckReader {
+    int fd_;
+    int32_t file_size_;
+    bool own_fd_;  // if false, do not close fd_
+
+    SelfCheckReader() : fd_(-1), file_size_(-1), own_fd_(true) {}
+
+    int init_from_fd(int fd) {
+        fd_ = fd;
+        own_fd_ = false;
+        if (fd_ < 0) {
+            return E_FILE_OPEN_ERR;
+        }
+#ifdef _WIN32
+        struct _stat st;
+        if (_fstat(fd_, &st) < 0) {
+            return E_FILE_STAT_ERR;
+        }
+        file_size_ = static_cast<int32_t>(st.st_size);
+#else
+        struct stat st;
+        if (fstat(fd_, &st) < 0) {
+            return E_FILE_STAT_ERR;
+        }
+        file_size_ = static_cast<int32_t>(st.st_size);
+#endif
+        return E_OK;
+    }
+
+    int open(const std::string& path) {
+#ifdef _WIN32
+        fd_ = ::_open(path.c_str(), _O_RDONLY | _O_BINARY);
+#else
+        fd_ = ::open(path.c_str(), O_RDONLY);
+#endif
+        if (fd_ < 0) {
+            return E_FILE_OPEN_ERR;
+        }
+        own_fd_ = true;
+#ifdef _WIN32
+        struct _stat st;
+        if (_fstat(fd_, &st) < 0) {
+            close();
+            return E_FILE_STAT_ERR;
+        }
+        file_size_ = static_cast<int32_t>(st.st_size);
+#else
+        struct stat st;
+        if (fstat(fd_, &st) < 0) {
+            close();
+            return E_FILE_STAT_ERR;
+        }
+        file_size_ = static_cast<int32_t>(st.st_size);
+#endif
+        return E_OK;
+    }
+
+    void close() {
+        if (own_fd_ && fd_ >= 0) {
+#ifdef _WIN32
+            ::_close(fd_);
+#else
+            ::close(fd_);
+#endif
+        }
+        fd_ = -1;
+        file_size_ = -1;
+    }
+
+    int32_t file_size() const { return file_size_; }
+
+    int read(int32_t offset, char* buf, int32_t buf_size, int32_t& read_len) {
+        read_len = 0;
+        if (fd_ < 0) {
+            return E_FILE_READ_ERR;
+        }
+        ssize_t n = ::pread(fd_, buf, buf_size, offset);
+        if (n < 0) {
+            return E_FILE_READ_ERR;
+        }
+        read_len = static_cast<int32_t>(n);
+        return E_OK;
+    }
+};
+
+#ifdef _WIN32
+ssize_t pread(int fd, void* buf, size_t count, uint64_t offset) {
+    DWORD read_bytes = 0;
+    OVERLAPPED ov = {};
+    ov.OffsetHigh = (DWORD)((offset >> 32) & 0xFFFFFFFF);
+    ov.Offset = (DWORD)(offset & 0xFFFFFFFF);
+    HANDLE h = (HANDLE)_get_osfhandle(fd);
+    if (!ReadFile(h, buf, (DWORD)count, &read_bytes, &ov)) {
+        if (GetLastError() != ERROR_HANDLE_EOF) {
+            return -1;
+        }
+    }
+    return (ssize_t)read_bytes;
+}
+#endif
+
+/**
+ * Parse chunk header at chunk_start and compute total chunk size (header +
+ * data). Does not read full chunk data; used to advance scan position.
+ * @param header_out If non-null, filled with the deserialized chunk header.
+ * @param bytes_consumed Set to header_len + data_size on success.
+ */
+static int parse_chunk_header_and_skip(SelfCheckReader& reader,
+                                       int64_t chunk_start,
+                                       int64_t& bytes_consumed,
+                                       ChunkHeader* header_out = nullptr) {
+    int32_t file_size = reader.file_size();
+    int32_t max_read = static_cast<int32_t>(
+        std::min(static_cast<int64_t>(BUF_SIZE), file_size - chunk_start));
+    if (max_read < ChunkHeader::MIN_SERIALIZED_SIZE) {
+        return E_TSFILE_CORRUPTED;
+    }
+
+    std::vector<char> buf(max_read);
+    int32_t read_len = 0;
+    int ret = reader.read(static_cast<int32_t>(chunk_start), buf.data(),
+                          max_read, read_len);
+    if (ret != E_OK || read_len < ChunkHeader::MIN_SERIALIZED_SIZE) {
+        return E_TSFILE_CORRUPTED;
+    }
+
+    ByteStream bs;
+    bs.wrap_from(buf.data(), read_len);
+
+    ChunkHeader header;
+    ret = header.deserialize_from(bs);
+    if (ret != E_OK) {
+        return E_TSFILE_CORRUPTED;
+    }
+
+    int header_len = bs.read_pos();
+    int64_t total = header_len + header.data_size_;
+    if (chunk_start + total > file_size) {
+        return E_TSFILE_CORRUPTED;
+    }
+
+    if (header_out != nullptr) {
+        *header_out = header;
+    }
+    bytes_consumed = total;
+    return E_OK;
+}
+
+/**
+ * Recover chunk-level statistic from chunk data so that tail metadata can be
+ * generated correctly after recovery (aligned with Java TsFileSequenceReader
+ * selfCheck). Multi-page: merge each page header's statistic. Single-page:
+ * decode page data and update stat. For aligned value chunks, time_batch
+ * (from the time chunk in the same group) must be provided.
+ */
+static int recover_chunk_statistic(
+    const ChunkHeader& chdr, const char* chunk_data, int32_t data_size,
+    Statistic* out_stat, common::PageArena* pa,
+    const std::vector<int64_t>* time_batch = nullptr,
+    std::vector<int64_t>* out_time_batch = nullptr) {
+    if (chunk_data == nullptr || data_size <= 0 || out_stat == nullptr) {
+        return E_OK;
+    }
+    common::ByteStream bs;
+    bs.wrap_from(const_cast<char*>(chunk_data),
+                 static_cast<uint32_t>(data_size));
+    // Multi-page chunk: high bits of chunk_type_ are 0x00, low 6 bits =
+    // CHUNK_HEADER_MARKER
+    const bool multi_page =
+        (static_cast<unsigned char>(chdr.chunk_type_) & 0x3F) ==
+        static_cast<unsigned char>(CHUNK_HEADER_MARKER);
+
+    if (multi_page) {
+        while (bs.remaining_size() > 0) {
+            PageHeader ph;
+            int ret = ph.deserialize_from(bs, true, chdr.data_type_);
+            if (ret != common::E_OK) {
+                return ret;
+            }
+            uint32_t comp = ph.compressed_size_;
+            if (ph.statistic_ != nullptr) {
+                if (out_stat->merge_with(ph.statistic_) != common::E_OK) {
+                    ph.reset();
+                    return common::E_TSFILE_CORRUPTED;
+                }
+            }
+            ph.reset();
+            bs.wrapped_buf_advance_read_pos(comp);
+        }
+        return E_OK;
+    }
+
+    // Single-page chunk: statistic is not in page header; decompress and 
decode
+    // to fill out_stat. is_time_column: bit 0x80 in chunk_type_ indicates time
+    // column (aligned model).
+    const bool is_time_column =
+        (static_cast<unsigned char>(chdr.chunk_type_) & 0x80) != 0;
+    PageHeader ph;
+    int ret = ph.deserialize_from(bs, false, chdr.data_type_);
+    if (ret != common::E_OK || ph.compressed_size_ == 0 ||
+        bs.remaining_size() < ph.compressed_size_) {
+        return E_OK;
+    }

Review Comment:
   fixed. I aligned this branch with Java selfCheck behavior: 
malformed/incomplete single-page chunk data is now treated as corruption 
(E_TSFILE_CORRUPTED) instead of being silently skipped.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to