Hi, I think the proposed feature is interesting, and after reviewing the original discussion thread, I'd like to help move it forward. To that end, I've summarized the key points of the discussion so far, including the challenges highlighted in the thread and some issues I identified off-list, along with potential solutions. My colleague will share a new POC patch set that has been developed based on these lines. (Note that the previous email was blocked, this is the second attempt)
The original proposal in this thread suggests filtering out changes that won't be published before decoding. This approach could prevent the serialization of numerous unnecessary records to disk when using non-streaming mode for large transactions. I find this to be a valid proposal. Although a recent commit has enabled streaming mode for built-in logical replication by default, this strategy could also save memory and CPU cycles in streaming mode, especially when most changes can be filtered out. While the proposal is interesting, several issues have been identified both in this thread and off-list: - Performance Determining whether to filter a change requires information about the relation or publication from the catalog, requiring a transaction to be started. When most changes in a transaction are unfilterable, the overhead of starting a transaction for each record is significant. This is not a big issue because we can cache whether a change corresponding to a table is filterable using a hash table. However, a single hash search may not be sufficiently cheap if performed for every record before decoding. In my tests, this caching approach produced noticeable overhead (around 4%), mainly due to the cost of computing the hash key and function calls in a hot path --4.86%--hash_search |--3.55%--tag_hash | --2.96%--hash_bytes --0.37%--hash_search_with_hash_value Such overhead may not be acceptable for all users, given valid use cases like publishing most tables on an instance (e.g., during an upgrade using logical replication), where this filtering wouldn't provide a benefit and would only incur overhead. A tiny approach to minimize overhead is to suspend filtering for a certain period when an unfilterable change is encountered. In other words, continue filtering changes (using hash cache) if the last record was successfully filtered out. If an unfilterable change is found, skip filtering the next 100 (an arbitrary number) changes. This approach aims to reduce frequent hash searches when most changes can't be filtered out, lowering the overhead to less than 1% on my machine. (This is a simple idea, and better algorithms could exist.) - Snapshot construction The main challenge discussed in this thread is constructing a correct historic snapshot for filtering purposes. Creating a historic snapshot while decoding an in-progress transaction is feasible, as it's already done in streaming mode. However, there could be pending records (INTERNAL_SNAPSHOT, COMMAND_ID, or INVALIDATION) that will update the built-in snapshot. We shouldn't use a current snapshot without processing these records. But thinking from another perspective, why not perform filtering only when there are no such pending records. By tracking whether the decoder has any of these records, we could skip filtering if they are present. These special records are generally generated only during DDL execution, which shouldn't be frequent; thus, the filter could still benefit many scenarios. - new output plugin callback To perform filtering, a new callback (e.g., change_filter) would be needed to invoke before decoding the record. Using pgoutput as an example, we would call get_rel_sync_entry() in the callback to determine if a change corresponding to a table can be filtered. get_rel_sync_entry() requires catalog access if it doesn't find the cache entry in the hash table. But as mentioned in the "Performance" section, we need to minimize catalog access and transaction start/stop. So, ideally, the callback should return whether it needs catalog access, allowing the caller (reorderbuffer) to start a transaction if necessary. The callback interface could be: (Note: The function or parameter names can be refined; the example just provided what's the input and output information.) typedef bool (LogicalDecodeFilterChangeCB) (struct LogicalDecodingContext ctx, Oid relid, ReorderBufferChangeType type, bool in_txn, bool *cache_valid); To signal that decoding should be skipped for the given change type, it returns true; false otherwise. The in_txn parameter indicates whether the callback is invoked within a transaction block. When in_txn is false, and if making a decision to filter a change requires being inside a transaction block, such as needing access to the catalog, set *cache_valid to false. The caller should invoke this for each record, and invoke this without starting a transaction, if the returned cache_valid is false, then the caller should should reinvoke the callback after starting a transaction. Another alternative approach in Li jie patch[1] is having the output plugin callback always provide all record types that can be published for a relation and cache them at the reorderbuffer level. However, I think this isn't feasible because the reorderbuffer isn't (and needn't to be) aware of when to invalidate such a cache. Although we could register the cache invalidation callback like pgoutput does in init_rel_sync_cache(), many third-party output plugins could have their own caching mechanisms, making it impossible for reorderbuffer to simulate cache management for all. Therefore, what the reorderbuffer should do, in my opinion, would be to directly ask the output plugin to see if a result can be obtained without entering a transaction, as per the interface I mentioned earlier. [1] https://www.postgresql.org/message-id/CAGfChW7XpZGkxOpHZnv69%2BH_AyKzbffcrumyNu4Fz0u%2B1ADPxA%40mail.gmail.com Best Regards, Hou zj