Alexey Serbin has posted comments on this change.

Change subject: async background flush provision for C++ client

Patch Set 2:


Thank you for review.

I addressed most of the comments, and I'm working on testing the change more 
thoroughly by all means (more tests will follow) and also looking at the other 
option of implementing background flushing.  Adar recommended to look at the 
option to utilize one of the reactor threads for that purpose.

I'll post an update on this change, but more changes are expected to appear 
Commit Message:

PS2, Line 7: provision
> nit: remove provision

Line 7: async background flush provision for C++ client
> nit: capitalize (Async)

PS2, Line 10: is
> nit: avoid using the passive voice. i.e. "KuduSession starts a background f

PS2, Line 13: virtual
> what's a virtual buffer?
There isn't a single buffer as is, there is a set of buffers used by multiple 
batchers.  Will add clarification comment.

Line 16: The flush criteria is based on buffer size for not-yet-scheduled
> I'm having trouble parsing this sentence.

PS2, Line 17: operatations
> nit: type

PS2, Line 20: operatations
> nit: typo

PS2, Line 23: KUDU
> Mention this issue in the commit message title.
File src/kudu/client/

Line 267: #ifndef NDEBUG
> spurious change
It's not spurious, it fixes compilation warning of unused variables for debug 

Line 383: 
> spurious change

Line 553:   //        since the other time it's called in KuduSession::Apply()
> nit: rephrase. How about:
File src/kudu/client/batcher.h:

Line 109:   // Return number of bytes used for batcher buffer
> nit: "Returns the"
I see most of comments in this file does not use third person singular form.  
The only exception is external_consistency_mode().  Are you sure it should be 
'Returns the number', but not 'Return the number'?

PS2, Line 216: //
> no need for this comment
File src/kudu/client/

Line 1991: // return an error with session running in any supported flush mode
> nit: period

PS2, Line 1993: static
> not need for static.

PS2, Line 1993: buffer_space_bytes_limit
> style: kBufferSpaceBytesLimit or BUFFER_SPACE_BYTES_LIMIT

PS2, Line 1994: static
> same

Line 1995:   static const KuduSession::FlushMode modes[] = {
> same

Line 2001:   for (const auto mode: modes) {
> isn't 'auto' already const ?

Line 2007:     ASSERT_TRUE(s.IsIncomplete()) << "got unexpected status: " << 
> nit: capitalize Got

Line 2014:   // Applying a bunch of small rows without a flush should not 
result in
> this sentence is huge and hard to read. please break it down.

Line 2016:   // since there is a flow control which makes Session::Apply() 
block and wait
> what: "a flow control"?

Line 2018:   {
> no need for the extra { scope

Line 2023:       ASSERT_OK(ApplyInsertToSession(session.get(), client_table_, 
1, 1, "x"));
> How this testing that Apply() is eventually blocking, or making sure that t
This test is to verify that there isn't an error about overrunning the buffer.  
Yes, it's not an explicit test to verify that Apply() is blocking.  Will 
address that separately.

Line 2029: // operations are put into the queue flushed after some time.
> you mean : "into the queue and flushed" right?

Line 2030: // The exact timeout interval is implementation-dependent,
> which timeout interval?

Line 2031: // but 100 ms is a good upper limit anyways.
> s/anyways/anyway

Line 2033:   {
> no need for the extra {

Line 2041:     SleepFor(MonoDelta::FromMilliseconds(100));
> loop instead of wait, otherwise this will be brittle in jenkins
File src/kudu/client/

Line 705:           "Cannot change flush mode when writes are buffered");
> nit period

Line 707:   data_->SetFlushMode(m);
> would it simplify things to now allow the flush mode to change if there is 
I think we can allow the flush mode to change regardless presence of 
buffered/pending write operations, but it requires proper design and some 
additional logic to be implemented.  I would try to address this in a separate 

Line 728:   return data_->SetBufferBytesLimit(size);
> why not use the same method name?
Because from internal perspective there is no context of any mutation.  It's 
just a buffer that accommodates byte representation of write operations.

>From the other hand, the outer (API-facing) name cannot be changes now without 
>braking existing clients/code.

Does it make sense?
File src/kudu/client/client.h:

Line 678:     // The Flush() call can be used to block until the latest batch 
is sent.
> maybe add/replace: "a batch is sent and the buffer has available space agai
File src/kudu/client/

Line 48:   // intentionally left empty
> no ned for this, you can also join the brackets in the line above, i.e. {}

Line 53:     WARN_NOT_OK(Stop(), "unable to stop AUTO_FLUSH_BACKGROUND thread");
> capitalize. Also a WARN? what happens if this fails?
Well, IMO capitalizing and adding period for error messages looks not so cool 
with those spurious capital letters and periods in the middle of the result 
message.  Those appear in error context already and it gives something like:

W0718 19:43:27.331773 21454848] Unable to stop 
AUTO_FLUSH_BACKGROUND thread.: Invalid argument: Can't join on own thread: 

However, I see this convention is used over all the places, so it makes sense 
to keep things consistent -- will capitalize. 

As for the consequences of this, I suspect in case of failure some 
unpredictable things might happen because further the destructor of the Thread 
is called which calls pthread_detach(), and if thread is left running it might 
lead to unpredictable results.

You are right -- that's kind of unrecoverable error.  Will replace with CHECK();

Line 58:   CHECK(!thread_) << "has already started";
> complete the sentence

Line 69:   while (true) {
> Overall I think this logic can be simplified
All right, I'll take a look, thanks.

PS2, Line 75: ||
> nit: style, move this to the line below, same elsewhere

Line 85:         // high watermark is 1/2 of the buffer space limit for 
non-flushed data
> can you use a flag for the over_watermark that you set as a percentage of t
Good idea -- will do.

Line 114:       sp::shared_ptr<KuduSession> session(weak_session_.lock());
> why not just increase the ref count?
I might be missing something, but that's exactily what it is -- creating 
shared_ptr out of weak_ptr, increasing ref count.  Or it's about something else?

Line 138:   is_explicit_flush_ = true;
> could you get rid of this bool and just call the cb if it is set?
I want to be able to pass null callback from the client code but call flush (so 
cb would be nullptr, but information on necessity of explicit flush should be 
stored elsewhere).  That's why separate boolean and callback variables.

Line 149:       buffer_bytes_limit_(7 * 1024 * 1024),  // TODO: clarify on this
> use a FLAG for this
Good idea -- will do, thanks.

Line 167:   WARN_NOT_OK(flusher_->Start(),
> shouldn't this return an error to the caller

Line 198:   // Both flush and flow control logic need to know on buffer limit 
> nit: remove "on"
OK, I see.  But wouldn't it be too restrictive?

Line 206: Status KuduSession::Data::CheckAgainstBufferLimit(size_t 
required_size) const {
> nit: remove "Against"

Line 222:     WARN_NOT_OK(flusher_->ForceFlush(cb), "failed to wake background 
> isn't this serious enough that the caller should get an error?
Yes, the error should be properly reported via the callback, if provided.  Will 
fix, thanks.

Line 335:     // because the background flusher could trigger spurios flush 
> typo
File src/kudu/client/session-internal.h:

PS2, Line 53: is set on currently
> I don;t think you need to mention what the caller will pass as a flush mode
That's the output parameter, actually.  I'll update the comment for clarity.

Line 58:   // Set limit on buffer space consumed by buffered write operations
> nit period here and elsewhere

Line 65:   // calling the callback with the flush result
> nit period

Line 105:   void LessBufferedBytes(int64_t bytes_used);
> less seems to imply comparison. Can you rename to DecreaseBufferedBytes

Line 107:   // Check if the specified size fits buffer given its current size 
> period at the end of this sentence and all the other ones.

Line 110:   // Mutex for the condition variables below
> this seems to be used for more than the ConditionVariables.
That's a good catch.  Will update the comment.

Line 129:   class BackgroundFlusher {
> move the inner class above the other private methods and fields.

Line 137:     // initialize the object and start the thread
> capitalize here and elsewhere

Line 151:     client::sp::weak_ptr<KuduSession> weak_session_;
> why isn't this hanging on to the KuduSession. What happens if it gets delet
Having just shared pointer to the KuduSession would create inter-dependent lock 
here (the flusher is aggregated by KuduSession::Data which is aggregated by 
KuduSession itself).

Having just raw pointer to KuduSession is not enough since the flusher creates 
new batchers, and those need a shared pointer to KuduSession.

If KuduSession is being destroyed, then instance of BackgroundFlusher will be 
destroyed first as the sub-sub-object of KuduSession, so it's not a problem.

PS2, Line 155: consequitive
> nit typo

Line 158:     // whether the thread is shutdown (guarded by cond_mutex_)
> nit: capitalize here and elsewhere

Line 169:   }; // class KuduSession::Data::Flusher
> we don't usually use class closing comments

To view, visit
To unsubscribe, visit

Gerrit-MessageType: comment
Gerrit-Change-Id: Idfde05bef342db24990c6e3da3b0270c3bb37a9d
Gerrit-PatchSet: 2
Gerrit-Project: kudu
Gerrit-Branch: master
Gerrit-Owner: Alexey Serbin <>
Gerrit-Reviewer: Adar Dembo <>
Gerrit-Reviewer: Alexey Serbin <>
Gerrit-Reviewer: David Ribeiro Alves <>
Gerrit-Reviewer: Kudu Jenkins
Gerrit-Reviewer: Todd Lipcon <>
Gerrit-HasComments: Yes

Reply via email to