jt2594838 commented on code in PR #734:
URL: https://github.com/apache/tsfile/pull/734#discussion_r2998729221
##########
cpp/src/writer/tsfile_writer.cc:
##########
@@ -764,35 +856,217 @@ int TsFileWriter::write_tablet_aligned(const Tablet&
tablet) {
data_types))) {
return ret;
}
- for (uint32_t row = 0; row < tablet.get_cur_row_size(); row++) {
- int32_t time_pages_before = time_chunk_writer->num_of_pages();
- std::vector<int32_t> value_pages_before(value_chunk_writers.size(), 0);
+ const uint32_t total_rows = tablet.get_cur_row_size();
+ const bool strict_page_size = common::g_config_value_.strict_page_size_;
+
+ // Decide whether we have string/blob/text columns.
+ bool has_varlen_column = false;
+ for (uint32_t i = 0; i < data_types.size(); i++) {
+ if (data_types[i] == common::STRING || data_types[i] == common::TEXT ||
+ data_types[i] == common::BLOB) {
+ has_varlen_column = true;
+ break;
+ }
+ }
+
+ // Keep writers' seal-check behavior consistent across calls.
+ time_chunk_writer->set_enable_page_seal_if_full(strict_page_size);
+ for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
+ if (!IS_NULL(value_chunk_writers[c])) {
+ value_chunk_writers[c]->set_enable_page_seal_if_full(
+ strict_page_size);
+ }
+ }
+
+ if (strict_page_size) {
+ // Strict mode: keep the original row-based insertion to ensure aligned
+ // pages seal together when either side becomes full.
+ for (uint32_t row = 0; row < total_rows; row++) {
+ int32_t time_pages_before = time_chunk_writer->num_of_pages();
+ std::vector<int32_t> value_pages_before(value_chunk_writers.size(),
+ 0);
+ for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
+ ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
+ if (!IS_NULL(value_chunk_writer)) {
+ value_pages_before[c] = value_chunk_writer->num_of_pages();
+ }
+ }
+
+ if (RET_FAIL(time_chunk_writer->write(tablet.timestamps_[row]))) {
+ return ret;
+ }
+ ASSERT(value_chunk_writers.size() == tablet.get_column_count());
+ for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
+ ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
+ if (IS_NULL(value_chunk_writer)) {
+ continue;
+ }
+ if (RET_FAIL(value_write_column(value_chunk_writer, tablet, c,
+ row, row + 1))) {
+ return ret;
+ }
+ }
+ if (RET_FAIL(maybe_seal_aligned_pages_together(
+ time_chunk_writer, value_chunk_writers, time_pages_before,
+ value_pages_before))) {
+ return ret;
+ }
+ }
+ return ret;
+ }
+
+ // Non-strict mode: switch to column-based insertion.
+ if (!has_varlen_column) {
+ // Optimization: when there is no string/blob/text column, we only need
+ // to split by point-number so that each split will trigger a page
+ // seal (and avoid the per-row page-size check).
+ const uint32_t points_per_page =
+ common::g_config_value_.page_writer_max_point_num_;
+
+ // Disable auto page sealing. We will seal pages at split boundaries.
+ time_chunk_writer->set_enable_page_seal_if_full(false);
for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
- ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
- if (!IS_NULL(value_chunk_writer)) {
- value_pages_before[c] = value_chunk_writer->num_of_pages();
+ if (!IS_NULL(value_chunk_writers[c])) {
+ value_chunk_writers[c]->set_enable_page_seal_if_full(false);
}
}
- if (RET_FAIL(time_chunk_writer->write(tablet.timestamps_[row]))) {
- return ret;
+ // Determine how many points we need to fill the current unsealed time
+ // page (it may already contain data from previous tablets).
+ uint32_t time_cur_points = time_chunk_writer->get_point_numer();
+ if (time_cur_points >= points_per_page &&
+ time_chunk_writer->has_current_page_data()) {
+ // Close the already-full page together with all aligned value
+ // pages.
+ if (RET_FAIL(time_chunk_writer->seal_current_page())) {
+ return ret;
+ }
+ for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
+ ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
+ if (!IS_NULL(value_chunk_writer) &&
+ value_chunk_writer->has_current_page_data()) {
+ if (RET_FAIL(value_chunk_writer->seal_current_page())) {
+ return ret;
+ }
+ }
+ }
+ time_cur_points = 0;
+ }
+ const uint32_t first_seg_len =
+ (time_cur_points > 0 && time_cur_points < points_per_page)
+ ? (points_per_page - time_cur_points)
+ : points_per_page;
+
+ // 1) Write time in segments and seal all full segments (except the
+ // last remaining segment).
+ uint32_t seg_start = 0;
+ uint32_t seg_len = first_seg_len;
+ while (seg_start < total_rows) {
+ const uint32_t seg_end = std::min(seg_start + seg_len, total_rows);
+ if (RET_FAIL(time_write_column(time_chunk_writer, tablet,
seg_start,
+ seg_end))) {
+ return ret;
+ }
+ seg_start = seg_end;
+ if (seg_start < total_rows) {
+ if (RET_FAIL(time_chunk_writer->seal_current_page())) {
+ return ret;
+ }
+ }
+ seg_len = points_per_page;
}
+
+ // 2) Write each value column in the same segments.
ASSERT(value_chunk_writers.size() == tablet.get_column_count());
- for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
- ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
+ for (uint32_t col = 0; col < value_chunk_writers.size(); col++) {
+ ValueChunkWriter* value_chunk_writer = value_chunk_writers[col];
if (IS_NULL(value_chunk_writer)) {
continue;
}
- if (RET_FAIL(value_write_column(value_chunk_writer, tablet, c, row,
- row + 1))) {
- return ret;
+
+ seg_start = 0;
+ seg_len = first_seg_len;
+ while (seg_start < total_rows) {
+ const uint32_t seg_end =
+ std::min(seg_start + seg_len, total_rows);
+ if (RET_FAIL(value_write_column(value_chunk_writer, tablet,
col,
+ seg_start, seg_end))) {
+ return ret;
+ }
+ seg_start = seg_end;
+ if (seg_start < total_rows) {
+ if (value_chunk_writer->has_current_page_data() &&
+ RET_FAIL(value_chunk_writer->seal_current_page())) {
+ return ret;
+ }
+ }
+ seg_len = points_per_page;
}
}
- if (RET_FAIL(maybe_seal_aligned_pages_together(
- time_chunk_writer, value_chunk_writers, time_pages_before,
- value_pages_before))) {
+ return ret;
+ }
+
+ // General non-strict (may have varlen STRING/TEXT/BLOB columns):
+ // time auto-seals to provide aligned page boundaries; value writers
+ // skip auto page sealing and are sealed manually at time boundaries.
+ // Attention: since value-side auto-seal is disabled, if a varlen value
+ // page hits the memory threshold earlier, it may not seal immediately
+ // and instead will be sealed later at the recorded time-page boundaries
+ // (this may sacrifice the strict page size limit for performance).
+ time_chunk_writer->set_enable_page_seal_if_full(true);
+ for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
+ if (!IS_NULL(value_chunk_writers[c])) {
+ value_chunk_writers[c]->set_enable_page_seal_if_full(false);
+ }
+ }
+
+ std::vector<uint32_t> time_page_row_ends;
+ time_page_row_ends.reserve(total_rows / 16 + 1);
Review Comment:
What is this 16?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]