[email protected] has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/19475 )

Change subject: IMPALA-11886: Data cache should support asynchronous writes
......................................................................


Patch Set 4:

(9 comments)

Thanks for the code review!

http://gerrit.cloudera.org:8080/#/c/19475/4//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/19475/4//COMMIT_MSG@24
PS4, Line 24: Testing:
> What circumstances show better performance?

How can I show that? Maybe post a record of the comparative tests? I have no 
relevant experience, please give me some advice.


http://gerrit.cloudera.org:8080/#/c/19475/4/be/src/runtime/io/data-cache-test.cc
File be/src/runtime/io/data-cache-test.cc:

http://gerrit.cloudera.org:8080/#/c/19475/4/be/src/runtime/io/data-cache-test.cc@106
PS4, Line 106:     while (cache.current_buffer_size_.Load() != 0) continue;
> Let's add a short sleep so that this is not spinning.
Done


http://gerrit.cloudera.org:8080/#/c/19475/4/be/src/runtime/io/data-cache.h
File be/src/runtime/io/data-cache.h:

http://gerrit.cloudera.org:8080/#/c/19475/4/be/src/runtime/io/data-cache.h@271
PS4, Line 271:     uint8_t* buffer_;
> I think it would be cleaner for this to be a unique_ptr.

Did you mean using unique_ptr<uint8_t[]> instead of uint8_t*? I tried but did 
not reduce the amount of code, did I misunderstand something?


http://gerrit.cloudera.org:8080/#/c/19475/4/be/src/runtime/io/data-cache.h@203
PS4, Line 203:
             :   /// The key used for look up in the cache.
             :   struct CacheKey {
             :    public:
             :     explicit CacheKey(const string& filename, int64_t mtime, 
int64_t offset)
             :       : key_(filename.size() + sizeof(mtime) + sizeof(offset)) {
             :       DCHECK_GE(mtime, 0);
             :       DCHECK_GE(offset, 0);
             :       key_.append(&mtime, sizeof(mtime));
             :       key_.append(&offset, sizeof(offset));
             :       key_.append(filename);
             :     }
             :
             :     int64_t Hash() const {
             :       return HashUtil::FastHash64(key_.data(), key_.size(), 0);
             :     }
             :
             :     Slice filename() const {
             :       return Slice(key_.data() + OFFSETOF_FILENAME, key_.size() 
- OFFSETOF_FILENAME);
             :     }
             :
             :     int64_t mtime() const {
             :       return UNALIGNED_LOAD64(key_.data() + OFFSETOF_MTIME);
             :     }
             :
             :     int64_t offset() const {
             :       return UNALIGNED_LOAD64(key_.data() + OFFSETOF_OFFSET);
             :     }
             :
             :     Slice ToSlice() const {
             :       return key_;
             :     }
             :
             :    private:
             :     // Key encoding stored in key_:
             :     //
             :     //  int64_t mtime;
             :     //  int64_t offset;
             :     //  <variable length bytes> filename;
             :     static constexpr int OFFSETOF_MTIME = 0;
             :     static constexpr int OFFSETOF_OFFSET = OFFSETOF_MTIME + 
sizeof(int64_t);
             :     static constexpr int OFFSETOF_FILENAME = OFFSETOF_OFFSET + 
sizeof(int64_t);
             :     kudu::faststring key_;
             :   };
             :
             :   /// The class to abstruct store behavior, including copying 
the buffer and holding it
             :   /// until store complete.
             :   class StoreTask {
             :    public:
             :     /// Creating a store task requires the filename, mtime, 
offset that constitutes the
             :     /// cache key, and the buffer and length of the cached data 
is required too. We
             :     /// allocate a new buffer in the constructor and copy the 
cache data and update
             :     /// total_size which keeps track of the total buffer size 
allocate by all store tasks.
             :     explicit StoreTask(const std::string& filename, int64_t 
mtime, int64_t offset,
             :         const uint8_t* buffer, int64_t buffer_len, AtomicInt64& 
total_size);
             :
             :     /// When the store task is destroyed, the allocated buffer 
is freed and total_size is
             :     /// updated.
             :     ~StoreTask();
             :
             :     const CacheKey& key() const { return key_; }
             :     const uint8_t* buffer() const { return buffer_; }
             :     int64_t buffer_len() const { return buffer_len_; }
             :
             :    private:
             :     DISALLOW_COPY_AND_ASSIGN(StoreTask);
             :
             :     CacheKey key_;
             :     uint8_t* buffer_;
             :     int64_t buffer_len_;
             :     AtomicInt64& total_size_;
             :   };
> Small thing: It would be nice to keep these structures defined in
 > data-cache.cc. The way to do that is to have a forward declaration
 > of CacheKey and StoreTask here in data-cache.h, move these
 > definitions to data-cache.cc, then also move the definition of
 > DataCache's constructor and destructor to data-cache.cc.
 >
 > Here in data-cache.h:
 > explicit DataCache(const std::string config, bool trace_replay =
 > false);
 >
 > ~DataCache();
 >
 > Over in data-cache.cc (a good spot would be right above the
 > definition of DataCache::Init):
 >
 > DataCache::DataCache(const std::string config, bool trace_replay)
 > : config_(config), trace_replay_(trace_replay) { }
 >
 > DataCache::~DataCache() { ReleaseResources(); }

That would have been nicer, and I was going to do that initially, but declaring 
StoreTask forward would have caused unique_ptr later to use an incomplete type 
that wouldn't compile, so I had to bring the definition up here.
Perhaps pImpl can solve this problem, like this:

in data_cache.h:
class StoreTask;
struct StoreTaskHandle {
        StoreTaskHandle();
        ~StoreTaskHandle();
        StoreTaskHandle(StoreTaskHandle&& other);
        StoreTaskHandle& operator=(StoreTaskHandle&& other);
        StoreTaskHandle(const StoreTask* task);
        const StoreTask* operator->() const;
        std::unique_ptr<const StoreTask> task;
};

in data_cache.cc:
DataCache::StoreTaskHandle::StoreTaskHandle() = default;
DataCache::StoreTaskHandle::~StoreTaskHandle() = default;
DataCache::StoreTaskHandle::StoreTaskHandle(StoreTaskHandle&& other) = default;
DataCache::StoreTaskHandle& 
DataCache::StoreTaskHandle::operator=(StoreTaskHandle&& other) = default;
DataCache::StoreTaskHandle::StoreTaskHandle(const StoreTask* task) : task(task) 
{ }
const DataCache::StoreTask* DataCache::StoreTaskHandle::operator->() const { 
return task.get(); }

But I don't think it's any more aesthetically pleasing.  What do you think?


http://gerrit.cloudera.org:8080/#/c/19475/4/be/src/runtime/io/data-cache.cc
File be/src/runtime/io/data-cache.cc:

http://gerrit.cloudera.org:8080/#/c/19475/4/be/src/runtime/io/data-cache.cc@117
PS4, Line 117: DEFINE_int32(data_cache_num_write_threads, 0,
             :     "Number of data cache write threads. Write threads will 
write the cache "
             :     "asynchronously after IO thread read data, so IO thread will 
return more quickly. "
             :     "Write threads need to bound the extra memory consumption 
for holding the temporary "
             :     "buffer though. If this's 0, then write will be 
synchronous.");
             : DEFINE_string(data_cache_write_buffer_limit, "1GB",
             :     "Limit of the total buffer size used by asynchronous store 
tasks.");
> For the startup parameter names, it would be good to include
 > "async" in the name so that it is clear that these only apply to
 > the async write functionality. i.e. data_cache_num_async_write_threads
 > and data_cache_async_write_buffer_limit.
 >
 > A separate question: How would someone decide what the right values
 > are?

I think a good value for the number of threads is the same as 
data_cache_write_concurrency. This way we can avoid dropping entries due to 
concurrency limits. These threads are used by multiple partitions. It is also 
considered to multiply data_cache_write_concurrency with number of partitions 
as the config value. How do you think?
I don't have any ideas about how to decide buffer size limit. Do you have any 
suggestions?


http://gerrit.cloudera.org:8080/#/c/19475/4/be/src/runtime/io/data-cache.cc@353
PS4, Line 353:   if (LIKELY(buffer != nullptr)) {
> If the buffer is nullptr, then there is nothing to store. Let's DCHECK that
Done


http://gerrit.cloudera.org:8080/#/c/19475/4/be/src/runtime/io/data-cache.cc@987
PS4, Line 987:     "current buffer size: $0 size limitation: $1 require: $2",
             :     current_buffer_size_.Load(), store_buffer_capacity_, 
buffer_len);
> Nit: For these two lines, please indent by 4 spaces
Done


http://gerrit.cloudera.org:8080/#/c/19475/4/be/src/util/impalad-metrics.cc
File be/src/util/impalad-metrics.cc:

http://gerrit.cloudera.org:8080/#/c/19475/4/be/src/util/impalad-metrics.cc@84
PS4, Line 84: const char* 
ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_ACTIVE_BUFFER_BYTES =
            :     "impala-server.io-mgr.remote-data-cache-active-buffer-bytes";
            : const char* 
ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_STORE_TASKS_CREATED =
            :     "impala-server.io-mgr.remote-data-cache-store-tasks-created";
            : const char* 
ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_OUT_OF_BUFFER_LIMIT_BYTES =
            :     
"impala-server.io-mgr.remote-data-cache-out-of-buffer-limit-bytes";
> From the point of view of naming, it would be nice if the names make it cle
Done


http://gerrit.cloudera.org:8080/#/c/19475/4/common/thrift/metrics.json
File common/thrift/metrics.json:

http://gerrit.cloudera.org:8080/#/c/19475/4/common/thrift/metrics.json@661
PS4, Line 661:
> Tiny thing: can we remove the trailing spaces? (Here and line 671)
Done



--
To view, visit http://gerrit.cloudera.org:8080/19475
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I878f7486d485b6288de1a9145f49576b7155d312
Gerrit-Change-Number: 19475
Gerrit-PatchSet: 4
Gerrit-Owner: Anonymous Coward <[email protected]>
Gerrit-Reviewer: Anonymous Coward <[email protected]>
Gerrit-Reviewer: Impala Public Jenkins <[email protected]>
Gerrit-Reviewer: Joe McDonnell <[email protected]>
Gerrit-Comment-Date: Thu, 16 Feb 2023 03:24:09 +0000
Gerrit-HasComments: Yes

Reply via email to