This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 48ee1e8b5 [compaction] Code cleanup and readability improvement
48ee1e8b5 is described below

commit 48ee1e8b5d7792384178c75840a998e413aaa512
Author: Ashwani Raina <ara...@cloudera.com>
AuthorDate: Tue Jan 23 17:50:11 2024 +0530

    [compaction] Code cleanup and readability improvement
    
    This is a base patch that does not change any functionality.
    Goal is to break the compaction memory usage improvement
    change into small ones to make it easy to review.
    
    Change-Id: I54709b5e27751581c889854911323fbddab1c4ab
    Reviewed-on: http://gerrit.cloudera.org:8080/21098
    Tested-by: Alexey Serbin <ale...@apache.org>
    Reviewed-by: Alexey Serbin <ale...@apache.org>
---
 src/kudu/tablet/compaction.cc       | 269 ++++++++++++++++++++++--------------
 src/kudu/tablet/compaction.h        |  17 ++-
 src/kudu/tablet/delta_compaction.cc |  16 +--
 3 files changed, 178 insertions(+), 124 deletions(-)

diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index d56b79b78..9ad3a1576 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -990,11 +990,11 @@ Mutation* MergeUndoHistories(Mutation* left, Mutation* 
right) {
 // and adds them to 'new_undo_head'.
 Status MergeDuplicatedRowHistory(const string& tablet_id,
                                  const scoped_refptr<FsErrorManager>& 
error_manager,
-                                 CompactionInputRow* old_row,
-                                 Mutation** new_undo_head,
+                                 const CompactionInputRow& old_row,
                                  Arena* arena,
-                                 const HistoryGcOpts& history_gc_opts) {
-  if (PREDICT_TRUE(old_row->previous_ghost == nullptr)) return Status::OK();
+                                 const HistoryGcOpts& history_gc_opts,
+                                 Mutation** new_undo_head) {
+  if (PREDICT_TRUE(old_row.previous_ghost == nullptr)) return Status::OK();
 
   // Use an all inclusive snapshot as all of the previous version's undos and 
redos
   // are guaranteed to be committed, otherwise the compaction wouldn't be able 
to
@@ -1003,7 +1003,7 @@ Status MergeDuplicatedRowHistory(const string& tablet_id,
 
   faststring dst;
 
-  CompactionInputRow* previous_ghost = old_row->previous_ghost;
+  CompactionInputRow* previous_ghost = old_row.previous_ghost;
   while (previous_ghost != nullptr) {
 
     // First step is to transform the old rows REDO's into UNDOs, if there are 
any.
@@ -1016,11 +1016,11 @@ Status MergeDuplicatedRowHistory(const string& 
tablet_id,
 
     RETURN_NOT_OK(ApplyMutationsAndGenerateUndos(all_snap,
                                                  *previous_ghost,
-                                                 &pv_new_undos_head,
-                                                 &pv_delete_redo,
                                                  arena,
+                                                 history_gc_opts,
                                                  &previous_ghost->row,
-                                                 history_gc_opts));
+                                                 &pv_new_undos_head,
+                                                 &pv_delete_redo));
 
     // We should be left with only one redo, the delete.
 #ifndef NDEBUG
@@ -1179,13 +1179,11 @@ void RowSetsInCompactionOrFlush::DumpToLog() const {
   }
 }
 
-void RemoveAncientUndos(const HistoryGcOpts& history_gc_opts,
-                        Mutation** undo_head,
+bool RemoveAncientUndos(const HistoryGcOpts& history_gc_opts,
                         const Mutation* redo_head,
-                        bool* is_garbage_collected) {
-  *is_garbage_collected = false;
+                        Mutation** undo_head) {
   if (!history_gc_opts.gc_enabled()) {
-    return;
+    return false;
   }
 
   // Make sure there is at most one REDO in the redo_head and that, if 
present, it's a DELETE.
@@ -1195,8 +1193,7 @@ void RemoveAncientUndos(const HistoryGcOpts& 
history_gc_opts,
 
     // Garbage collect rows that are deleted before the AHM.
     if (history_gc_opts.IsAncientHistory(redo_head->timestamp())) {
-      *is_garbage_collected = true;
-      return;
+      return true;
     }
   }
 
@@ -1218,6 +1215,7 @@ void RemoveAncientUndos(const HistoryGcOpts& 
history_gc_opts,
     prev_undo = undo_mut;
     undo_mut = undo_mut->next();
   }
+  return false;
 }
 
 // Applies the REDOs of 'src_row' in accordance with the input snapshot,
@@ -1228,11 +1226,11 @@ void RemoveAncientUndos(const HistoryGcOpts& 
history_gc_opts,
 // NOTE: input REDOs are expected to be in increasing timestamp order.
 Status ApplyMutationsAndGenerateUndos(const MvccSnapshot& snap,
                                       const CompactionInputRow& src_row,
-                                      Mutation** new_undo_head,
-                                      Mutation** new_redo_head,
                                       Arena* arena,
+                                      const HistoryGcOpts& history_gc_opts,
                                       RowBlockRow* dst_row,
-                                      const HistoryGcOpts& history_gc_opts) {
+                                      Mutation** new_undo_head,
+                                      Mutation** new_redo_head) {
   bool is_deleted = false;
 
   #define ERROR_LOG_CONTEXT \
@@ -1390,6 +1388,135 @@ Status ApplyMutationsAndGenerateUndos(const 
MvccSnapshot& snap,
   #undef ERROR_LOG_CONTEXT
 }
 
+// Append REDO and UNDO deltas to DiskRowSetWriter output.
+static Status AppendDeltasToDRS(RollingDiskRowSetWriter* out,
+                              Mutation* new_undos_head,
+                              Mutation* new_redos_head,
+                              RowBlockRow* dst_row) {
+  rowid_t index_in_current_drs;
+
+  if (new_undos_head != nullptr) {
+    // Append UNDO deltas to DiskRowSetWriter output.
+    RETURN_NOT_OK(out->AppendUndoDeltas(dst_row->row_index(),
+                                        new_undos_head,
+                                        &index_in_current_drs));
+  }
+
+  if (new_redos_head != nullptr) {
+    // Append REDO deltas to DiskRowSetWriter output.
+    RETURN_NOT_OK(out->AppendRedoDeltas(dst_row->row_index(),
+                                        new_redos_head,
+                                        &index_in_current_drs));
+  }
+
+  DVLOG(4) << "Output Row: " << dst_row->schema()->DebugRow(*dst_row)
+           << "; RowId: " << index_in_current_drs;
+
+  return Status::OK();
+}
+
+#ifndef NDEBUG
+// Sanity check for UNDO list.
+static void UndoListSanityCheck(Mutation* new_undos_head) {
+      auto* u = new_undos_head;
+      bool is_deleted = false;
+      // The resulting list should have the following invariants:
+      // - deletes can only be observed if not already deleted
+      // - reinserts can only be observed if deleted
+      // - UNDO mutations are in decreasing order
+      while (u != nullptr) {
+        if (u->changelist().is_delete()) {
+          CHECK(!is_deleted);
+          is_deleted = true;
+        } else if (u->changelist().is_reinsert()) {
+          CHECK(is_deleted);
+          is_deleted = false;
+        }
+        if (!u->next()) break;
+        CHECK_GE(u->timestamp(), u->next()->timestamp());
+        u = u->next();
+      }
+}
+#endif // NDEBUG
+
+// For each input row, go through all the REDO mutations and apply those to 
base row.
+// Generate corresponding UNDO deltas for applied mutations.
+// For a row with 'ghost' entries, merge their histories of mutations.
+// Remove ancient UNDO mutations and check if row is required to be garbage 
collected.
+// Append REDO and UNDO deltas to DRS output.
+// Do sanity check for final UNDO list.
+static Status ApplyMutationsAndMergeDuplicateHistory(const MvccSnapshot& snap,
+                                                     const CompactionInputRow& 
input_row,
+                                                     size_t cur_row_idx,
+                                                     RowBlock* block,
+                                                     Arena* arena,
+                                                     const string& tablet_id,
+                                                     const 
scoped_refptr<FsErrorManager>& err_mgr,
+                                                     const HistoryGcOpts& 
history_gc_opts,
+                                                     RollingDiskRowSetWriter* 
out,
+                                                     int* live_row_count,
+                                                     bool* 
is_garbage_collected) {
+  RETURN_NOT_OK(out->RollIfNecessary());
+
+  const Schema* schema = input_row.row.schema();
+  DCHECK_SCHEMA_EQ(*schema, out->schema());
+  DCHECK(schema->has_column_ids());
+
+  DVLOG(4) << "Input Row: " << CompactionInputRowToString(input_row);
+
+  RowBlockRow dst_row = block->row(cur_row_idx);
+  RETURN_NOT_OK(CopyRow(input_row.row, &dst_row, 
static_cast<Arena*>(nullptr)));
+
+  // Collect the new UNDO/REDO mutations.
+  Mutation* new_undos_head = nullptr;
+  Mutation* new_redos_head = nullptr;
+
+  // Apply all REDOs to the base row, generating UNDOs for it. This does
+  // not take into account any 'previous_ghost' members.
+  RETURN_NOT_OK(ApplyMutationsAndGenerateUndos(snap,
+                                               input_row,
+                                               arena,
+                                               history_gc_opts,
+                                               &dst_row,
+                                               &new_undos_head,
+                                               &new_redos_head));
+
+  // Merge the histories of 'input_row' with previous ghosts, if there are any.
+  RETURN_NOT_OK(MergeDuplicatedRowHistory(tablet_id,
+                                          err_mgr,
+                                          input_row,
+                                          arena,
+                                          history_gc_opts,
+                                          &new_undos_head));
+
+  // Remove ancient UNDOS and check whether the row should be garbage 
collected.
+  *is_garbage_collected = RemoveAncientUndos(history_gc_opts,
+                                             new_redos_head,
+                                             &new_undos_head);
+
+  DVLOG(4) << "Output Row: " << RowToString(dst_row, new_redos_head, 
new_undos_head) <<
+      "; Was garbage collected? " << *is_garbage_collected;
+
+  // Skip further processing if this row was garbage collected
+  if (!*is_garbage_collected) {
+    RETURN_NOT_OK(AppendDeltasToDRS(out,
+                                    new_undos_head,
+                                    new_redos_head,
+                                    &dst_row));
+
+    // If the REDO is empty, it should not be a DELETE.
+    if (new_redos_head == nullptr) {
+      (*live_row_count)++;
+    }
+
+#ifndef NDEBUG
+    UndoListSanityCheck(new_undos_head);
+#endif // NDEBUG
+  }
+
+  return Status::OK();
+}
+
 // Following method processes the compaction input by reading input rows in
 // blocks and for each row inside the block:
 // - Apply all REDO mutations collected for the row at hand.
@@ -1414,110 +1541,38 @@ Status FlushCompactionInput(const string& tablet_id,
   while (input->HasMoreBlocks()) {
     RETURN_NOT_OK(input->PrepareBlock(&rows));
 
-    int n = 0;
+    size_t cur_row_idx = 0;
     int live_row_count = 0;
-    for (int i = 0; i < rows.size(); i++) {
-      CompactionInputRow* input_row = &rows[i];
-      RETURN_NOT_OK(out->RollIfNecessary());
-
-      const Schema* schema = input_row->row.schema();
-      DCHECK_SCHEMA_EQ(*schema, out->schema());
-      DCHECK(schema->has_column_ids());
-
-      RowBlockRow dst_row = block.row(n);
-      RETURN_NOT_OK(CopyRow(input_row->row, &dst_row, 
static_cast<Arena*>(nullptr)));
-
-      DVLOG(4) << "Input Row: " << CompactionInputRowToString(*input_row);
-
-      // Collect the new UNDO/REDO mutations.
-      Mutation* new_undos_head = nullptr;
-      Mutation* new_redos_head = nullptr;
-
-      // Apply all REDOs to the base row, generating UNDOs for it. This does
-      // not take into account any 'previous_ghost' members.
-      RETURN_NOT_OK(ApplyMutationsAndGenerateUndos(snap,
-                                                   *input_row,
-                                                   &new_undos_head,
-                                                   &new_redos_head,
-                                                   input->PreparedBlockArena(),
-                                                   &dst_row,
-                                                   history_gc_opts));
-
-      // Merge the histories of 'input_row' with previous ghosts, if there are 
any.
-      RETURN_NOT_OK(MergeDuplicatedRowHistory(tablet_id,
-                                              error_manager,
-                                              input_row,
-                                              &new_undos_head,
-                                              input->PreparedBlockArena(),
-                                              history_gc_opts));
-
-      // Remove ancient UNDOS and check whether the row should be garbage 
collected.
+    for (const auto& row : rows) {
       bool is_garbage_collected = false;
-      RemoveAncientUndos(history_gc_opts,
-                         &new_undos_head,
-                         new_redos_head,
-                         &is_garbage_collected);
-
-      DVLOG(4) << "Output Row: " << RowToString(dst_row, new_redos_head, 
new_undos_head) <<
-          "; Was garbage collected? " << is_garbage_collected;
 
+      RETURN_NOT_OK(ApplyMutationsAndMergeDuplicateHistory(snap,
+                                                           row,
+                                                           cur_row_idx,
+                                                           &block,
+                                                           
input->PreparedBlockArena(),
+                                                           tablet_id,
+                                                           error_manager,
+                                                           history_gc_opts,
+                                                           out,
+                                                           &live_row_count,
+                                                           
&is_garbage_collected));
       // Whether this row was garbage collected
       if (is_garbage_collected) {
         // Don't flush the row.
         continue;
       }
 
-      rowid_t index_in_current_drs;
-
-      if (new_undos_head != nullptr) {
-        // Append UNDO deltas to DiskRowSetWriter output.
-        out->AppendUndoDeltas(dst_row.row_index(), new_undos_head, 
&index_in_current_drs);
-      }
-
-      if (new_redos_head != nullptr) {
-        // Append REDO deltas to DiskRowSetWriter output.
-        out->AppendRedoDeltas(dst_row.row_index(), new_redos_head, 
&index_in_current_drs);
-      }
-
-      // If the REDO is empty, it should not be a DELETE.
-      if (new_redos_head == nullptr) {
-        live_row_count++;
-      }
-
-      DVLOG(4) << "Output Row: " << dst_row.schema()->DebugRow(dst_row)
-               << "; RowId: " << index_in_current_drs;
-#ifndef NDEBUG
-      auto* u = new_undos_head;
-      bool is_deleted = false;
-      // The resulting list should have the following invariants:
-      // - deletes can only be observed if not already deleted
-      // - reinserts can only be observed if deleted
-      // - UNDO mutations are in decreasing order
-      while (u != nullptr) {
-        if (u->changelist().is_delete()) {
-          CHECK(!is_deleted);
-          is_deleted = true;
-        } else if (u->changelist().is_reinsert()) {
-          CHECK(is_deleted);
-          is_deleted = false;
-        }
-        if (!u->next()) break;
-        CHECK_GE(u->timestamp(), u->next()->timestamp());
-        u = u->next();
-      }
-#endif // NDEBUG
-
-      n++;
-      if (n == block.nrows()) {
+      if (++cur_row_idx == block.nrows()) {
         // Append fully processed rowblock to DRS writer output.
         RETURN_NOT_OK(out->AppendBlock(block, live_row_count));
         live_row_count = 0;
-        n = 0;
+        cur_row_idx = 0;
       }
     }
 
-    if (n > 0) {
-      block.Resize(n);
+    if (cur_row_idx > 0) {
+      block.Resize(cur_row_idx);
       // Append partially (resized) processed rowblock to DRS writer output.
       RETURN_NOT_OK(out->AppendBlock(block, live_row_count));
       block.Resize(block.row_capacity());
diff --git a/src/kudu/tablet/compaction.h b/src/kudu/tablet/compaction.h
index 0404e06ca..67c42587c 100644
--- a/src/kudu/tablet/compaction.h
+++ b/src/kudu/tablet/compaction.h
@@ -203,16 +203,15 @@ struct CompactionInputRow {
 // Function shared by flushes and compactions. Removes UNDO Mutations
 // considered "ancient" from the given CompactionInputRow, modifying the undo
 // mutation list in-place.
-// 'is_garbage_collected': Set to true if the row was marked as deleted prior
-// to the ancient history mark, with no reinsertions after that. In such a
-// case, all traces of the row should be removed from disk by the caller.
+// Return true if the row was marked as deleted prior to the ancient history 
mark,
+// with no reinsertions after that. In such a case, all traces of the row 
should
+// be removed from disk by the caller.
 //
 // This is supposed to be called after ApplyMutationsAndGenerateUndos() where 
REDOS
 // are transformed in UNDOs. There can be at most one REDO in 'redo_head', a 
DELETE.
-void RemoveAncientUndos(const HistoryGcOpts& history_gc_opts,
-                        Mutation** undo_head,
+bool RemoveAncientUndos(const HistoryGcOpts& history_gc_opts,
                         const Mutation* redo_head,
-                        bool* is_garbage_collected);
+                        Mutation** undo_head);
 
 // Function shared by flushes, compactions and major delta compactions. 
Applies all the REDO
 // mutations from 'src_row' to the 'dst_row', and generates the related UNDO 
mutations. Some
@@ -224,11 +223,11 @@ void RemoveAncientUndos(const HistoryGcOpts& 
history_gc_opts,
 //                            ignored.
 Status ApplyMutationsAndGenerateUndos(const MvccSnapshot& snap,
                                       const CompactionInputRow& src_row,
-                                      Mutation** new_undo_head,
-                                      Mutation** new_redo_head,
                                       Arena* arena,
+                                      const HistoryGcOpts& history_gc_opts,
                                       RowBlockRow* dst_row,
-                                      const HistoryGcOpts& history_gc_opts);
+                                      Mutation** new_undo_head,
+                                      Mutation** new_redo_head);
 
 // Iterate through this compaction input, flushing all rows to the given 
RollingDiskRowSetWriter.
 // The 'snap' argument should match the MvccSnapshot used to create the 
compaction input.
diff --git a/src/kudu/tablet/delta_compaction.cc 
b/src/kudu/tablet/delta_compaction.cc
index 6a582530d..dc612ef1c 100644
--- a/src/kudu/tablet/delta_compaction.cc
+++ b/src/kudu/tablet/delta_compaction.cc
@@ -179,16 +179,16 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const 
IOContext* io_context) {
       DVLOG(3) << "MDC Input Row - RowId: " << row_id << " "
                << CompactionInputRowToString(*input_row);
 
-      // NOTE: This is presently ignored.
-      bool is_garbage_collected;
-
-      RETURN_NOT_OK(ApplyMutationsAndGenerateUndos(
-          snap, *input_row, &new_undos_head, &new_redos_head, &mem.arena,
-          &dst_row, history_gc_opts_));
+      RETURN_NOT_OK(ApplyMutationsAndGenerateUndos(snap,
+                                                   *input_row,
+                                                   &mem.arena,
+                                                   history_gc_opts_,
+                                                   &dst_row,
+                                                   &new_undos_head,
+                                                   &new_redos_head));
       RemoveAncientUndos(history_gc_opts_,
-                         &new_undos_head,
                          new_redos_head,
-                         &is_garbage_collected);
+                         &new_undos_head);
 
       DVLOG(3) << "MDC Output Row - RowId: " << row_id << " "
                << RowToString(dst_row, new_undos_head, new_redos_head);

Reply via email to