Adar Dembo has posted comments on this change.

Change subject: KUDU-456 Implement AUTO_FLUSH_BACKGROUND flush mode

Patch Set 15:


Here's my first pass.
Commit Message:

PS15, Line 20: waterline
Nit: I think elsewhere you changed this to watermark; could you do the same 
here, for consistency?
File src/kudu/client/client-internal.h:

Line 240:  private:
In the PIMPL idiom, the impl class is typically completely public. Not sure 
what privatizing these deleted constructors/operators buys us.
File src/kudu/client/

Line 53: #include "kudu/gutil/strings/human_readable.h"
Is this used for anything?

Line 692:   : data_(new KuduSession::Data(client, client->data_->messenger_)) {
Since the messenger is reachable through the client, why not have 
KuduSession::Data do it in the constructor? Is it to avoid more friendship?

Line 733: Status KuduSession::SetMutationBufferSpace(size_t size) {
One of the simplifying assumptions made by the Java client is that many (if not 
all) of these setters may only be called when there are no pending operations. 
I suggest we do the same here; I think it'll obviate the need to broadcast on 
flow_control_cond_, for one.
File src/kudu/client/client.h:

PS15, Line 1035: In this mode, the Flush() call can be used to block until a 
               :     /// is sent and the buffer has available space again.
To be clear, Flush() also causes a background flush that normally would have 
happened at some point in the near future to happen _now_, right?

PS15, Line 1127: the
Nit: can drop this.

PS15, Line 1129: pushing
Nit: sending

PS15, Line 1130: into
Nit: to the

PS15, Line 1133: OK
Nit: safe

PS15, Line 1134: does not come into play
Nit: has no effect

PS15, Line 1139:   
Nit: got two spaces here, can you fix and check the rest?

PS15, Line 1140: by be 
What happened here?

PS15, Line 1141: n
And here?

Line 1146:   Status SetMutationBufferFlushWatermark(double watermark_pct)
It would also be useful to describe (in the comment) what the default watermark 
percentage is.

PS15, Line 1161: OK
Nit: safe

PS15, Line 1162:  does not come into play
Nit: has no effect

PS15, Line 1166: internal
Nit: interval
File src/kudu/client/

Line 57:       flush_interval_(MonoDelta::FromMilliseconds(10)) {
The default flush interval seems really low. Isn't it something like 5s in 
Java? Why so low here?

PS15, Line 64: -1,
PeriodicFlushTask uses 0, not -1. Is there a significance to this difference? 
If not, can you make them the same?

Line 66:     PeriodicFlushTask(Status::OK(), messenger_, session);
I don't think this is quite the periodic semantics we want.

Take a look at the Java client. It only schedules a background flush when the 
first op is applied. The advantage of that approach is that if you create a 
session, wait (flush_interval - 1) ms, and apply an operation, you will have to 
wait an additional flush_interval ms before your operation is automatically 
flushed. With this approach, you'll get the background flush happening almost 
immediately, giving the client no time to accumulate more operations into the 

Line 74:     bytes_flushed = batcher->buffer_bytes_used();
Pull this out of the lock, presumably it's only flushed_batchers_ that needs to 
be protected.

PS15, Line 81: there might be several data pushers
How? Shouldn't there be at most one outstanding thread in Apply(), since one 
KuduSession is meant for one writing thread?

PS15, Line 101: flusher
Not clear what a "flusher" is (now that there's no dedicated thread).

Line 102:   std::lock_guard<Mutex> l(cond_mutex_);
Why is this taken here? flush_mode_ is not protected by cond_mutex_.

Now that I take another look, it seems flush_mode_ can be accessed by the 
reactor thread in PeriodicFlushTask(), so it does need some synchronization, 
but it's not being applied consistently yet (nor is it documented).

Line 109:   flow_control_cond_.Broadcast();
Why do this? We've guaranteed there are no pending operations, so who would 
care about this broadcast?

PS15, Line 115:   // The data flow control logic needs to know if the buffer 
limit changes.
              :   // There might be several threads calling 
Let's simplify by preventing buffer size changes when there are pending 

Also, if it wasn't already obvious, remember that KuduSession is inherently 
single-threaded: clients must use a separate KuduSession for each writing 

Line 122:   CHECK_GE(watermark_pct, 0);
This seems harsh for a client library; we can return Status::IllegalArgument. 
Should check that it's not over 100 either.

Line 135:   CHECK_GE(timeout_ms, 0);
Too harsh here too, but I guess we're locked into the void return value now.

Line 169:       return 0;
Nit: indentation.

PS15, Line 230:   RETURN_NOT_OK(TryApplyWriteOp(write_op, &required_size,
              :                                 &current_flush_mode, 
The output parameters are confusing:
- The flush mode can't change mid-operation.
- The watermark shouldn't either; a session is accessible to only one client 
- Can't this function find out the required size instead of asking 
TryApplyWriteOp() to do it on its behalf?

PS15, Line 321: accomodate
Nit: accommodate

Line 393:   if (data->flush_mode_ != AUTO_FLUSH_BACKGROUND) {
Doesn't this mean flush_mode_ needs to be synchronized? Otherwise a client 
thread might be trying to change it while the reactor thread is reading it here.

Line 398:   data->NextBatcher(session, 0, nullptr);
Seems KuduSession::Data::Init() with AUTO_FLUSH_BACKGROUND will flush an empty 
batcher here, no?
File src/kudu/client/session-internal.h:

Line 83:   void NextBatcher(const sp::weak_ptr<KuduSession>& session,
Please doc the significance of 'watermark'.

PS15, Line 87: On successful return, the output flush_mode parameter is set
             :   // to the effective flush mode.
Why? Why can't the caller just look at flush_mode_ directly?

Line 124:  private:
How did you decide which members were private and which were public? Especially 
for data members, it seems rather arbitrary.

Line 131:   Status TryApplyWriteOp(KuduWriteOperation* write_op, int64_t* 
This doesn't actually apply the operation, though. It just checks to see if it 
can. Call it CanApplyWriteOp?

PS15, Line 159: Mutex for the condition variables below and for other
              :   // condition-related counters and members.
There's only one condition variable now.

Line 161:   Mutex cond_mutex_;
What's the relationship between this new lock and lock_? Please add a comment 
somewhere explaining whether it's OK to acquire both (and if so in what order).
File src/kudu/client/write_op.h:

PS15, Line 112: Once called, the result is cached
              :   // so subsequent calls will return the size previously 
computed.  Set the
              :   // parameter to 'false' to re-compute the size.
Why cache at all? As far as I can tell, there's only one call to 
SizeInBuffer(): when the operation is applied.

Also, all of the callers omit use_cached_result, so what's the point of the 
non-cached option?

To view, visit
To unsubscribe, visit

Gerrit-MessageType: comment
Gerrit-Change-Id: I34905c30b3aad96f53cf7a1822b1cde6d25f33a8
Gerrit-PatchSet: 15
Gerrit-Project: kudu
Gerrit-Branch: master
Gerrit-Owner: Alexey Serbin <>
Gerrit-Reviewer: Adar Dembo <>
Gerrit-Reviewer: Alexey Serbin <>
Gerrit-Reviewer: Dan Burkert <>
Gerrit-Reviewer: Kudu Jenkins
Gerrit-Reviewer: Todd Lipcon <>
Gerrit-HasComments: Yes

Reply via email to