Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-14 Thread Dong Lin
Hey all,

After discussion with Becket in the
https://github.com/apache/kafka/pull/3621, I have updated the KIP-113 to
make the following minor changes to the protocol:

1) Renamed DescribeDirsRequest (and DescribeDirsResponse) to
DescribeLogDirsRequest (and DescribeLogDirsResponse).

This change is intended to betteer distinguish between log directory and
the directory of the partition.

2) Removed "log_dirs" field from DescribeLogDirsRequest

DescribeLogDirsRequest can be used for two purposes. It can be used to get
current list of replicas and size the broker so that user can determine the
new assignment. And it can be used to check the current log directory of
the replica to check whether replica movement has completed. In either
cases, all log directories on the broker needs to be queried and thus we
don't have a use-case for the log_dirs field in the DescribeLogDirsRequest.

3) Renamed new APIs in the admin client as appropriate to replace "Dir"
with "LogDir".

Thanks,
Dong






On Fri, Aug 11, 2017 at 10:01 AM, Jun Rao  wrote:

> Hi, Tom,
>
> The inter-broker-with-log-dirs case can be split into
> inter-broker-w/o-log-dirs and log-dirs change per broker. KIP-113 proposes
> to do the split in the tool. I am not sure if we really need to persist
> log-dirs changes in ZK. During the discussion in KIP-113, we realized that
> there is only a very short window when this information could be lost. In
> the rare cases when this info is lost, one can always issue
> AlterReplicaDirRequests
> again. With this, once we add PartitionReassignmentRequest (w/o log dirs),
> the reassignment tool won't need ZK, right?
>
> If you have a single request for inter-broker-with-log-dirs, the receiving
> broker has to do the split. Perhaps you can write down how the receiving
> broker processes the merged request. Then, we can see how much additional
> complexity is needed. Ideally, it would be useful to avoid adding
> additional logic in the controller for it to understand log dirs.
>
> Thanks,
>
> Jun
>
> On Fri, Aug 11, 2017 at 8:49 AM, Tom Bentley 
> wrote:
>
> > Hi Jun,
> >
> > The inter-broker movement case has two subcases:
> >
> > 1. Where no log dir is supplied. This corresponds to the existing
> > kafka-reassign-partitions script. This just needs the appropriate JSON to
> > be written to the reassignment znode.
> > 2. Where the log dir is supplied. This is covered in KIP-113 (in addition
> > to the intra-broker case) and that KIP defines an algorithm where an
> > initial AlterReplicaDirRequests is sent to each receiving broker, then
> the
> > znode gets updated, then there are further AlterReplicaDirRequests.
> >
> > In the first case, the JSON lacks any log dir information. In the second
> > case the JSON includes log dir information. I'm suggesting that a single
> > PartitionReassignmentRequest class could be used to represent (and be
> > convertible to) both kinds of JSON. (In fact the one JSON schema is a
> > subset of the other).
> >
> > So PartitionReassignmentRequest would indeed only be necessary for
> > inter-broker movement, but it would be necessary in both the with- and
> > without log dir cases of that.
> >
> > While I could have a PartitionReassignmentRequest that only dealt with
> > inter-broker-without-log-dirs data movement, that wouldn't be enough to
> > address the needs of KIP-179, because the inter-broker-with-log-dirs case
> > still needs to update the znode, and KIP-179 is all about the
> > script/command not talking to Zookeeper any more.
> >
> > Does that make sense to you?
> >
> > Cheers,
> >
> > Tom
> >
> >
> > On 11 August 2017 at 16:22, Jun Rao  wrote:
> >
> > > Hi, Tom,
> > >
> > > One approach is to have a PartitionReassignmentRequest that only deals
> > with
> > > inter broker data movement (i.e, w/o any log dirs in the request). The
> > > request is directed to any broker, which then just writes the
> > reassignment
> > > json to ZK. There is a separate AlterReplicaDirRequest that only deals
> > with
> > > intra broker data movement (i.e., with the log dirs in the request).
> This
> > > request is directed to the specific broker who replicas need to moved
> btw
> > > log dirs. This seems to be what's in your original proposal in KIP-179,
> > > which I think makes sense.
> > >
> > > In your early email, I thought you were proposing to have
> > > PartitionReassignmentRequest
> > > dealing with both inter and intra broker data movement (i.e., include
> log
> > > dirs in the request). Then, I am not sure how this request will be
> > > processed on the broker. So, you were not proposing that?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Fri, Aug 11, 2017 at 5:37 AM, Tom Bentley 
> > > wrote:
> > >
> > > > Hi Jun and Dong,
> > > >
> > > > Thanks for your replies...
> > > >
> > > > On 10 August 2017 at 20:43, Dong Lin  wrote:
> > > >
> > > > > This is a very good idea. I have 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-11 Thread Jun Rao
Hi, Tom,

The inter-broker-with-log-dirs case can be split into
inter-broker-w/o-log-dirs and log-dirs change per broker. KIP-113 proposes
to do the split in the tool. I am not sure if we really need to persist
log-dirs changes in ZK. During the discussion in KIP-113, we realized that
there is only a very short window when this information could be lost. In
the rare cases when this info is lost, one can always issue
AlterReplicaDirRequests
again. With this, once we add PartitionReassignmentRequest (w/o log dirs),
the reassignment tool won't need ZK, right?

If you have a single request for inter-broker-with-log-dirs, the receiving
broker has to do the split. Perhaps you can write down how the receiving
broker processes the merged request. Then, we can see how much additional
complexity is needed. Ideally, it would be useful to avoid adding
additional logic in the controller for it to understand log dirs.

Thanks,

Jun

On Fri, Aug 11, 2017 at 8:49 AM, Tom Bentley  wrote:

> Hi Jun,
>
> The inter-broker movement case has two subcases:
>
> 1. Where no log dir is supplied. This corresponds to the existing
> kafka-reassign-partitions script. This just needs the appropriate JSON to
> be written to the reassignment znode.
> 2. Where the log dir is supplied. This is covered in KIP-113 (in addition
> to the intra-broker case) and that KIP defines an algorithm where an
> initial AlterReplicaDirRequests is sent to each receiving broker, then the
> znode gets updated, then there are further AlterReplicaDirRequests.
>
> In the first case, the JSON lacks any log dir information. In the second
> case the JSON includes log dir information. I'm suggesting that a single
> PartitionReassignmentRequest class could be used to represent (and be
> convertible to) both kinds of JSON. (In fact the one JSON schema is a
> subset of the other).
>
> So PartitionReassignmentRequest would indeed only be necessary for
> inter-broker movement, but it would be necessary in both the with- and
> without log dir cases of that.
>
> While I could have a PartitionReassignmentRequest that only dealt with
> inter-broker-without-log-dirs data movement, that wouldn't be enough to
> address the needs of KIP-179, because the inter-broker-with-log-dirs case
> still needs to update the znode, and KIP-179 is all about the
> script/command not talking to Zookeeper any more.
>
> Does that make sense to you?
>
> Cheers,
>
> Tom
>
>
> On 11 August 2017 at 16:22, Jun Rao  wrote:
>
> > Hi, Tom,
> >
> > One approach is to have a PartitionReassignmentRequest that only deals
> with
> > inter broker data movement (i.e, w/o any log dirs in the request). The
> > request is directed to any broker, which then just writes the
> reassignment
> > json to ZK. There is a separate AlterReplicaDirRequest that only deals
> with
> > intra broker data movement (i.e., with the log dirs in the request). This
> > request is directed to the specific broker who replicas need to moved btw
> > log dirs. This seems to be what's in your original proposal in KIP-179,
> > which I think makes sense.
> >
> > In your early email, I thought you were proposing to have
> > PartitionReassignmentRequest
> > dealing with both inter and intra broker data movement (i.e., include log
> > dirs in the request). Then, I am not sure how this request will be
> > processed on the broker. So, you were not proposing that?
> >
> > Thanks,
> >
> > Jun
> >
> > On Fri, Aug 11, 2017 at 5:37 AM, Tom Bentley 
> > wrote:
> >
> > > Hi Jun and Dong,
> > >
> > > Thanks for your replies...
> > >
> > > On 10 August 2017 at 20:43, Dong Lin  wrote:
> > >
> > > > This is a very good idea. I have updated the KIP-113 so that
> > > > DescribeDirResponse returns lag instead of LEO.
> > >
> > >
> > > Excellent!
> > >
> > > On Thu, Aug 10, 2017 at 10:21 AM, Jun Rao  wrote:
> > > >
> > > > > 2. Tom, note that currently, the LeaderAndIsrRequest doesn't
> specify
> > > the
> > > > > log dir. So, I am not sure in your new proposal, how the log dir
> info
> > > is
> > > > > communicated to all brokers. Is the broker receiving the
> > > > > ReassignPartitionsRequest
> > > > > going to forward that to all brokers?
> > > >
> > >
> > > My understanding of KIP-113 is that each broker has its own set of log
> > dirs
> > > (even though in practice they might all have the same names, and might
> > all
> > > be distributed across the brokers disks in the same way, and all those
> > > disks might be identical), so it doesn't make sense for one broker to
> be
> > > told about the log dirs of another broker.
> > >
> > > Furthermore, it is the AlterReplicaDirRequest that is sent to the
> > receiving
> > > broker which associates the partition with the log dir on that broker.
> To
> > > quote from KIP-113 (specifically, the notes in this section
> > >  > > 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-11 Thread Tom Bentley
Hi Jun,

The inter-broker movement case has two subcases:

1. Where no log dir is supplied. This corresponds to the existing
kafka-reassign-partitions script. This just needs the appropriate JSON to
be written to the reassignment znode.
2. Where the log dir is supplied. This is covered in KIP-113 (in addition
to the intra-broker case) and that KIP defines an algorithm where an
initial AlterReplicaDirRequests is sent to each receiving broker, then the
znode gets updated, then there are further AlterReplicaDirRequests.

In the first case, the JSON lacks any log dir information. In the second
case the JSON includes log dir information. I'm suggesting that a single
PartitionReassignmentRequest class could be used to represent (and be
convertible to) both kinds of JSON. (In fact the one JSON schema is a
subset of the other).

So PartitionReassignmentRequest would indeed only be necessary for
inter-broker movement, but it would be necessary in both the with- and
without log dir cases of that.

While I could have a PartitionReassignmentRequest that only dealt with
inter-broker-without-log-dirs data movement, that wouldn't be enough to
address the needs of KIP-179, because the inter-broker-with-log-dirs case
still needs to update the znode, and KIP-179 is all about the
script/command not talking to Zookeeper any more.

Does that make sense to you?

Cheers,

Tom


On 11 August 2017 at 16:22, Jun Rao  wrote:

> Hi, Tom,
>
> One approach is to have a PartitionReassignmentRequest that only deals with
> inter broker data movement (i.e, w/o any log dirs in the request). The
> request is directed to any broker, which then just writes the reassignment
> json to ZK. There is a separate AlterReplicaDirRequest that only deals with
> intra broker data movement (i.e., with the log dirs in the request). This
> request is directed to the specific broker who replicas need to moved btw
> log dirs. This seems to be what's in your original proposal in KIP-179,
> which I think makes sense.
>
> In your early email, I thought you were proposing to have
> PartitionReassignmentRequest
> dealing with both inter and intra broker data movement (i.e., include log
> dirs in the request). Then, I am not sure how this request will be
> processed on the broker. So, you were not proposing that?
>
> Thanks,
>
> Jun
>
> On Fri, Aug 11, 2017 at 5:37 AM, Tom Bentley 
> wrote:
>
> > Hi Jun and Dong,
> >
> > Thanks for your replies...
> >
> > On 10 August 2017 at 20:43, Dong Lin  wrote:
> >
> > > This is a very good idea. I have updated the KIP-113 so that
> > > DescribeDirResponse returns lag instead of LEO.
> >
> >
> > Excellent!
> >
> > On Thu, Aug 10, 2017 at 10:21 AM, Jun Rao  wrote:
> > >
> > > > 2. Tom, note that currently, the LeaderAndIsrRequest doesn't specify
> > the
> > > > log dir. So, I am not sure in your new proposal, how the log dir info
> > is
> > > > communicated to all brokers. Is the broker receiving the
> > > > ReassignPartitionsRequest
> > > > going to forward that to all brokers?
> > >
> >
> > My understanding of KIP-113 is that each broker has its own set of log
> dirs
> > (even though in practice they might all have the same names, and might
> all
> > be distributed across the brokers disks in the same way, and all those
> > disks might be identical), so it doesn't make sense for one broker to be
> > told about the log dirs of another broker.
> >
> > Furthermore, it is the AlterReplicaDirRequest that is sent to the
> receiving
> > broker which associates the partition with the log dir on that broker. To
> > quote from KIP-113 (specifically, the notes in this section
> >  > 3A+Support+replicas+movement+between+log+directories#KIP-
> > 113:Supportreplicasmovementbetweenlogdirectories-1%29Howtomo
> > vereplicabetweenlogdirectoriesonthesamebroker>
> > ):
> >
> > - If broker doesn't not have already replica created for the specified
> > > topicParition when it receives AlterReplicaDirRequest, it will reply
> > > ReplicaNotAvailableException AND remember (replica, destination log
> > > directory) pair in memory to create the replica in the specified log
> > > directory when it receives LeaderAndIsrRequest later.
> > >
> >
> > I've not proposed anything to change that, really. All I've done is
> change
> > who creates the znode which causes the LeaderAndIsrRequest. Because
> KIP-113
> > has been accepted, I've tried to avoid attempting to change it too much.
> >
> > Cheers,
> >
> > Tom
> >
>


Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-11 Thread Jun Rao
Hi, Tom,

One approach is to have a PartitionReassignmentRequest that only deals with
inter broker data movement (i.e, w/o any log dirs in the request). The
request is directed to any broker, which then just writes the reassignment
json to ZK. There is a separate AlterReplicaDirRequest that only deals with
intra broker data movement (i.e., with the log dirs in the request). This
request is directed to the specific broker who replicas need to moved btw
log dirs. This seems to be what's in your original proposal in KIP-179,
which I think makes sense.

In your early email, I thought you were proposing to have
PartitionReassignmentRequest
dealing with both inter and intra broker data movement (i.e., include log
dirs in the request). Then, I am not sure how this request will be
processed on the broker. So, you were not proposing that?

Thanks,

Jun

On Fri, Aug 11, 2017 at 5:37 AM, Tom Bentley  wrote:

> Hi Jun and Dong,
>
> Thanks for your replies...
>
> On 10 August 2017 at 20:43, Dong Lin  wrote:
>
> > This is a very good idea. I have updated the KIP-113 so that
> > DescribeDirResponse returns lag instead of LEO.
>
>
> Excellent!
>
> On Thu, Aug 10, 2017 at 10:21 AM, Jun Rao  wrote:
> >
> > > 2. Tom, note that currently, the LeaderAndIsrRequest doesn't specify
> the
> > > log dir. So, I am not sure in your new proposal, how the log dir info
> is
> > > communicated to all brokers. Is the broker receiving the
> > > ReassignPartitionsRequest
> > > going to forward that to all brokers?
> >
>
> My understanding of KIP-113 is that each broker has its own set of log dirs
> (even though in practice they might all have the same names, and might all
> be distributed across the brokers disks in the same way, and all those
> disks might be identical), so it doesn't make sense for one broker to be
> told about the log dirs of another broker.
>
> Furthermore, it is the AlterReplicaDirRequest that is sent to the receiving
> broker which associates the partition with the log dir on that broker. To
> quote from KIP-113 (specifically, the notes in this section
>  3A+Support+replicas+movement+between+log+directories#KIP-
> 113:Supportreplicasmovementbetweenlogdirectories-1%29Howtomo
> vereplicabetweenlogdirectoriesonthesamebroker>
> ):
>
> - If broker doesn't not have already replica created for the specified
> > topicParition when it receives AlterReplicaDirRequest, it will reply
> > ReplicaNotAvailableException AND remember (replica, destination log
> > directory) pair in memory to create the replica in the specified log
> > directory when it receives LeaderAndIsrRequest later.
> >
>
> I've not proposed anything to change that, really. All I've done is change
> who creates the znode which causes the LeaderAndIsrRequest. Because KIP-113
> has been accepted, I've tried to avoid attempting to change it too much.
>
> Cheers,
>
> Tom
>


Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-11 Thread Tom Bentley
Hi Jun and Dong,

Thanks for your replies...

On 10 August 2017 at 20:43, Dong Lin  wrote:

> This is a very good idea. I have updated the KIP-113 so that
> DescribeDirResponse returns lag instead of LEO.


Excellent!

On Thu, Aug 10, 2017 at 10:21 AM, Jun Rao  wrote:
>
> > 2. Tom, note that currently, the LeaderAndIsrRequest doesn't specify the
> > log dir. So, I am not sure in your new proposal, how the log dir info is
> > communicated to all brokers. Is the broker receiving the
> > ReassignPartitionsRequest
> > going to forward that to all brokers?
>

My understanding of KIP-113 is that each broker has its own set of log dirs
(even though in practice they might all have the same names, and might all
be distributed across the brokers disks in the same way, and all those
disks might be identical), so it doesn't make sense for one broker to be
told about the log dirs of another broker.

Furthermore, it is the AlterReplicaDirRequest that is sent to the receiving
broker which associates the partition with the log dir on that broker. To
quote from KIP-113 (specifically, the notes in this section

):

- If broker doesn't not have already replica created for the specified
> topicParition when it receives AlterReplicaDirRequest, it will reply
> ReplicaNotAvailableException AND remember (replica, destination log
> directory) pair in memory to create the replica in the specified log
> directory when it receives LeaderAndIsrRequest later.
>

I've not proposed anything to change that, really. All I've done is change
who creates the znode which causes the LeaderAndIsrRequest. Because KIP-113
has been accepted, I've tried to avoid attempting to change it too much.

Cheers,

Tom


Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-10 Thread Dong Lin
Hey Jun,

This is a very good idea. I have updated the KIP-113 so that
DescribeDirResponse returns lag instead of LEO. If the replica is not a
temporary replica, then lag = max(0, HW - LEO). Otherwise, lag = primary
Replica's LEO - temporary Replica's LEO.

Thanks!
Dong

On Thu, Aug 10, 2017 at 10:21 AM, Jun Rao  wrote:

> Hi, Tom, Dong,
>
> A couple of comments on that.
>
> 1. I think we can unify the reporting of lags. Basically, the lag will be
> reported on every replica (temporary or permanent), not just at the leader
> replica. If it's permanent, lag is max(0, HW - LEO) as it is now.
> Otherwise, lag is (LEO of permanent replica - LEO of temporary replica).
> That way, it seems that we can use a single request to monitor the progress
> of both inter and intra replica movement and it would be more accurate than
> relying on LEO directly.
>
> 2. Tom, note that currently, the LeaderAndIsrRequest doesn't specify the
> log dir. So, I am not sure in your new proposal, how the log dir info is
> communicated to all brokers. Is the broker receiving the
> ReassignPartitionsRequest
> going to forward that to all brokers?
>
> Thanks,
>
> Jun
>
>
>
> On Thu, Aug 10, 2017 at 6:57 AM, Tom Bentley 
> wrote:
>
> > I've spent some time thinking about KIP-179 and KIP-113, the proposed
> > algorithms and APIs, and trying to weigh the pros and cons of various
> > alternative options.
> >
> > I think Dong's reasons for the algorithm for inter-broker move in KIP-113
> > make a lot of sense. I don't think it would be at all simple to try to
> > change that algorithm to one where the whole thing can be triggered by a
> > single call to an AdminClient method. So I guess we should try to keep as
> > much of that algorithm as possible.
> >
> > KIP-179 will need to change this step
> >
> >  - The script creates reassignment znode in zookeeper.
> > >
> >
> > with an AdminClient API call. This call can the same one as currently
> > specified in KIP-179 -- reassignPartitions() -- except the argument needs
> > to take into account the need to pass log dirs as well as broker ids.
> Thus
> > I would suggest
> >
> > ReassignPartitionsResult reassignPartitions(Map > List> assignments)
> >
> > where:
> >
> > class ReplicAssignment {
> > int broker()
> > String logDirectory()// can be null
> > }
> >
> > (This is just a Java representation of the reassignment json in KIP-113,
> > which itself is a superset of the reassignment json currently in use)
> >
> > The corresponding protocol would look like this:
> >
> > ReassignPartitionsRequest => timeout validate_only log_dirs
> > [topic_assignment]
> >   timout => int32
> >   validate_only => boolean
> >   log_dirs => [string]
> >   topic_assignment => topic [partition_assignment]
> > topic => string
> > partition_assignment => partition [replica]
> >   partition => int32
> >   replica => broker log_dir_index
> > broker => int32
> > log_dir_index => int16
> >
> > The purpose of log_dirs is to serialize each log dir in the request only
> > once. These are then referred to by index in log_dir_index. The
> > log_dir_index can be -1, which means the caller doesn't care which log
> > directory should be used on the receiving broker.
> >
> > This request can be sent to *any* broker. The broker which receives a
> > ReassignPartitionsRequest essentially converts it into reassignment JSON
> > and writes that JSON to the znode, then returns a
> > ReassignPartitionsResponse:
> >
> > ReassignPartitionsResponse => throttle_time_ms
> > [topic_assignment_result]
> >   throttle_time_ms => INT32
> >   log_dirs => [string]
> >   topic_assignment_result => topic partition_assignment_result
> > topic => STRING
> > partition_assignment_result => partition [replica_result]
> >   partition => int32
> >   replica_result => broker log_dir_index error_code
> > error_message
> > broker => int32
> > log_dir_index => int16
> > error_code => INT16
> >  error_message => NULLABLE_STRING
> >
> > This is using the same encoding scheme as wrt log_dirs as described
> above.
> >
> > Meanwhile the controller is notified by ZK of the change in value of the
> > znode and proceeds, as currently, by sending LeaderAndIsrRequest and
> > StopReplicaRequest in order to complete the reassignments.
> >
> > The remaining problem is around how to measure progress of reassignment.
> As
> > mentioned in the email I wrote this morning, I think we really need two
> > different lag calculations if we're using the lag to measure progress and
> > we want the property that lag=0 means reassignment has finished. The
> > problem with that, I now realise, is the script might be called with
> > reassignments which are a mix of:
> >
> > * inter-broker moves without a 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-10 Thread Jun Rao
Hi, Tom, Dong,

A couple of comments on that.

1. I think we can unify the reporting of lags. Basically, the lag will be
reported on every replica (temporary or permanent), not just at the leader
replica. If it's permanent, lag is max(0, HW - LEO) as it is now.
Otherwise, lag is (LEO of permanent replica - LEO of temporary replica).
That way, it seems that we can use a single request to monitor the progress
of both inter and intra replica movement and it would be more accurate than
relying on LEO directly.

2. Tom, note that currently, the LeaderAndIsrRequest doesn't specify the
log dir. So, I am not sure in your new proposal, how the log dir info is
communicated to all brokers. Is the broker receiving the
ReassignPartitionsRequest
going to forward that to all brokers?

Thanks,

Jun



On Thu, Aug 10, 2017 at 6:57 AM, Tom Bentley  wrote:

> I've spent some time thinking about KIP-179 and KIP-113, the proposed
> algorithms and APIs, and trying to weigh the pros and cons of various
> alternative options.
>
> I think Dong's reasons for the algorithm for inter-broker move in KIP-113
> make a lot of sense. I don't think it would be at all simple to try to
> change that algorithm to one where the whole thing can be triggered by a
> single call to an AdminClient method. So I guess we should try to keep as
> much of that algorithm as possible.
>
> KIP-179 will need to change this step
>
>  - The script creates reassignment znode in zookeeper.
> >
>
> with an AdminClient API call. This call can the same one as currently
> specified in KIP-179 -- reassignPartitions() -- except the argument needs
> to take into account the need to pass log dirs as well as broker ids. Thus
> I would suggest
>
> ReassignPartitionsResult reassignPartitions(Map List> assignments)
>
> where:
>
> class ReplicAssignment {
> int broker()
> String logDirectory()// can be null
> }
>
> (This is just a Java representation of the reassignment json in KIP-113,
> which itself is a superset of the reassignment json currently in use)
>
> The corresponding protocol would look like this:
>
> ReassignPartitionsRequest => timeout validate_only log_dirs
> [topic_assignment]
>   timout => int32
>   validate_only => boolean
>   log_dirs => [string]
>   topic_assignment => topic [partition_assignment]
> topic => string
> partition_assignment => partition [replica]
>   partition => int32
>   replica => broker log_dir_index
> broker => int32
> log_dir_index => int16
>
> The purpose of log_dirs is to serialize each log dir in the request only
> once. These are then referred to by index in log_dir_index. The
> log_dir_index can be -1, which means the caller doesn't care which log
> directory should be used on the receiving broker.
>
> This request can be sent to *any* broker. The broker which receives a
> ReassignPartitionsRequest essentially converts it into reassignment JSON
> and writes that JSON to the znode, then returns a
> ReassignPartitionsResponse:
>
> ReassignPartitionsResponse => throttle_time_ms
> [topic_assignment_result]
>   throttle_time_ms => INT32
>   log_dirs => [string]
>   topic_assignment_result => topic partition_assignment_result
> topic => STRING
> partition_assignment_result => partition [replica_result]
>   partition => int32
>   replica_result => broker log_dir_index error_code
> error_message
> broker => int32
> log_dir_index => int16
> error_code => INT16
>  error_message => NULLABLE_STRING
>
> This is using the same encoding scheme as wrt log_dirs as described above.
>
> Meanwhile the controller is notified by ZK of the change in value of the
> znode and proceeds, as currently, by sending LeaderAndIsrRequest and
> StopReplicaRequest in order to complete the reassignments.
>
> The remaining problem is around how to measure progress of reassignment. As
> mentioned in the email I wrote this morning, I think we really need two
> different lag calculations if we're using the lag to measure progress and
> we want the property that lag=0 means reassignment has finished. The
> problem with that, I now realise, is the script might be called with
> reassignments which are a mix of:
>
> * inter-broker moves without a log dir (=> use HW-replicaLEO)
> * inter-broker moves with a log dir (=> use HW-replicaLEO)
> * intra-broker moves with a log dir (=> use .log_LEO - .move_LEO)
>
> And if there were two APIs we'd end up needing to make both kinds of query
> to each broker in the cluster. This morning I said:
>
> But AFAICS this observation doesn't really help much in terms of the APIs
> > concerned though. Since the requests would still need to go to different
> > brokers depending on which kind of movement is being performed.
> >
>
> But I wonder if that's *really* such a 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-10 Thread Tom Bentley
I've spent some time thinking about KIP-179 and KIP-113, the proposed
algorithms and APIs, and trying to weigh the pros and cons of various
alternative options.

I think Dong's reasons for the algorithm for inter-broker move in KIP-113
make a lot of sense. I don't think it would be at all simple to try to
change that algorithm to one where the whole thing can be triggered by a
single call to an AdminClient method. So I guess we should try to keep as
much of that algorithm as possible.

KIP-179 will need to change this step

 - The script creates reassignment znode in zookeeper.
>

with an AdminClient API call. This call can the same one as currently
specified in KIP-179 -- reassignPartitions() -- except the argument needs
to take into account the need to pass log dirs as well as broker ids. Thus
I would suggest

ReassignPartitionsResult reassignPartitions(Map assignments)

where:

class ReplicAssignment {
int broker()
String logDirectory()// can be null
}

(This is just a Java representation of the reassignment json in KIP-113,
which itself is a superset of the reassignment json currently in use)

The corresponding protocol would look like this:

ReassignPartitionsRequest => timeout validate_only log_dirs
[topic_assignment]
  timout => int32
  validate_only => boolean
  log_dirs => [string]
  topic_assignment => topic [partition_assignment]
topic => string
partition_assignment => partition [replica]
  partition => int32
  replica => broker log_dir_index
broker => int32
log_dir_index => int16

The purpose of log_dirs is to serialize each log dir in the request only
once. These are then referred to by index in log_dir_index. The
log_dir_index can be -1, which means the caller doesn't care which log
directory should be used on the receiving broker.

This request can be sent to *any* broker. The broker which receives a
ReassignPartitionsRequest essentially converts it into reassignment JSON
and writes that JSON to the znode, then returns a
ReassignPartitionsResponse:

ReassignPartitionsResponse => throttle_time_ms [topic_assignment_result]
  throttle_time_ms => INT32
  log_dirs => [string]
  topic_assignment_result => topic partition_assignment_result
topic => STRING
partition_assignment_result => partition [replica_result]
  partition => int32
  replica_result => broker log_dir_index error_code
error_message
broker => int32
log_dir_index => int16
error_code => INT16
 error_message => NULLABLE_STRING

This is using the same encoding scheme as wrt log_dirs as described above.

Meanwhile the controller is notified by ZK of the change in value of the
znode and proceeds, as currently, by sending LeaderAndIsrRequest and
StopReplicaRequest in order to complete the reassignments.

The remaining problem is around how to measure progress of reassignment. As
mentioned in the email I wrote this morning, I think we really need two
different lag calculations if we're using the lag to measure progress and
we want the property that lag=0 means reassignment has finished. The
problem with that, I now realise, is the script might be called with
reassignments which are a mix of:

* inter-broker moves without a log dir (=> use HW-replicaLEO)
* inter-broker moves with a log dir (=> use HW-replicaLEO)
* intra-broker moves with a log dir (=> use .log_LEO - .move_LEO)

And if there were two APIs we'd end up needing to make both kinds of query
to each broker in the cluster. This morning I said:

But AFAICS this observation doesn't really help much in terms of the APIs
> concerned though. Since the requests would still need to go to different
> brokers depending on which kind of movement is being performed.
>

But I wonder if that's *really* such a problem: In the case of an
inter-broker move we just need to ask the leader, and in the case of an
intra-broker move we just have to ask that broker. In generally we'd need a
single request to each broker in the cluster. Then each broker would need
to service that request, but presumably it's just pulling a number out of a
ConcurrentHashMap, which is updated by the replica movement code in each of
the two cases (inter-broker and intra-broker). WDYT?

Assuming no one can see any glaring holes in what I'm proposing here, or
wants to suggest a workable alternative set of APIs and algorithms, then
I'll update KIP-179 to this effect.

Thanks for taking the time to read this far!

Tom

On 10 August 2017 at 11:56, Tom Bentley  wrote:

> Hi Dong and Jun,
>
> It seems that KIP-179 does not explicitly specify the definition of this
>> lag.
>
>
> Given that the definition of "caught up" is "is the replica in the ISR?",
> I found the code in Partition.maybeExpandIsr() which decides whether a
> replica should be added to the to the 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-10 Thread Tom Bentley
Hi Dong and Jun,

It seems that KIP-179 does not explicitly specify the definition of this
> lag.


Given that the definition of "caught up" is "is the replica in the ISR?", I
found the code in Partition.maybeExpandIsr() which decides whether a
replica should be added to the to the ISR and it uses
replica.logEndOffset.offsetDiff(leaderHW)
>= 0, so for this purpose I would define the lag as max(leaderHW -
replicaLEO, 0). I think we agree this should work for inter-broker
movement, where the leader knows these quantities.

As Dong says, this doesn't work for the intra-broker case:

Note that we can not calculate lag as max(0, HW - LEO)
> because we still need the difference between two lags to measure the
> progress of intra-broker replica movement.
>

It seems to me that the intra-broker case is actually a special case of the
inter-broker case. Conceptually with an intra-broker move the ".log"
replica is the leader, the ".move" directory is the follower, the ISR is
the singleton containing the leader, thus the HW if the LEO of the ".log".
Viewed in this way, Dong's method of leaderLEO - followerLEO is the same
thing for the intra-broker case as HW-LEO is for the inter-broker case.

But AFAICS this observation doesn't really help much in terms of the APIs
concerned though. Since the requests would still need to go to different
brokers depending on which kind of movement is being performed.

So perhaps this is another case where maybe it makes sense to keep the two
APIs separate, one API for measuring inter-broker movement progress an
another for the intra-broker case. WDYT?

Thanks for the continuing discussion on this!

Tom


On 10 August 2017 at 05:28, Dong Lin  wrote:

> Hey Jun,
>
> I have been thinking about whether it is better to return lag (i.e. HW -
> LEO) instead of LEO. Note that the lag in the DescribeDirsResponse may be
> negative if LEO > HW. It will almost always be negative for leader and
> in-sync replicas. Note that we can not calculate lag as max(0, HW - LEO)
> because we still need the difference between two lags to measure the
> progress of intra-broker replica movement. The AdminClient API can choose
> to return max(0, HW - LEO) depending on whether it is used for tracking
> progress of inter-broker reassignment or intra-broker movement. Is it OK?
> If so, I will update the KIP-113 accordingly to return lag in the
> DescribeDirsResponse .
>
> Thanks,
> Dong
>
>
>
>  source=link_campaign=sig-email_content=webmail_term=icon>
> Virus-free.
> www.avast.com
>  source=link_campaign=sig-email_content=webmail_term=link>
> <#DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>
> On Wed, Aug 9, 2017 at 5:06 PM, Jun Rao  wrote:
>
> > Hi, Dong,
> >
> > Yes, the lag in a replica is calculated as the difference of LEO of the
> > replica and the HW. So, as long as a replica is in sync, the lag is
> always
> > 0.
> >
> > So, I was suggesting to return lag instead of LEO in DescribeDirsResponse
> > for each replica. I am not sure if we need to return HW though.
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Aug 9, 2017 at 5:01 PM, Dong Lin  wrote:
> >
> > > Hey Jun,
> > >
> > > It just came to me that you may be assuming that folower_lag = HW -
> > > follower_LEO. If that is the case, then we need to have new
> > > request/response to retrieve this lag since the DescribeDirsResponse
> > > doesn't even include HW. It seems that KIP-179 does not explicitly
> > specify
> > > the definition of this lag.
> > >
> > > I have been assuming that follow_lag = leader_LEO - follower_LEO given
> > that
> > > the request is used to query the reassignment status. Strictly speaking
> > the
> > > difference between leader_LEO and the HW is limited by the amount of
> data
> > > produced in KafkaConfig.replicaLagTimeMaxMs, which is 10 seconds. I
> also
> > > assumed that 10 seconds is probably not a big deal given the typical
> time
> > > length of the reassignment.
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Wed, Aug 9, 2017 at 4:40 PM, Dong Lin  wrote:
> > >
> > > > Hey Jun,
> > > >
> > > > If I understand you right, you are suggesting that, in the case when
> > > there
> > > > is continuous incoming traffic, the approach in the KIP-179 will
> report
> > > lag
> > > > as 0 whereas the approach using DescribeDirsRequest will report lag
> as
> > > > non-zero. But I think the approach in KIP-179 will also report
> non-zero
> > > lag
> > > > when there is continuous traffic. This is because at the time the
> > leader
> > > > receives ReplicaStatusRequest, it is likely that some data has been
> > > > appended to the partition after the last FetchRequest from the
> > follower.
> > > > Does this make sense?
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > >
> > > >
> > > > On Wed, Aug 9, 2017 at 4:24 PM, Jun Rao  wrote:
> > > >
> > > >> 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-09 Thread Dong Lin
Hey Jun,

I have been thinking about whether it is better to return lag (i.e. HW -
LEO) instead of LEO. Note that the lag in the DescribeDirsResponse may be
negative if LEO > HW. It will almost always be negative for leader and
in-sync replicas. Note that we can not calculate lag as max(0, HW - LEO)
because we still need the difference between two lags to measure the
progress of intra-broker replica movement. The AdminClient API can choose
to return max(0, HW - LEO) depending on whether it is used for tracking
progress of inter-broker reassignment or intra-broker movement. Is it OK?
If so, I will update the KIP-113 accordingly to return lag in the
DescribeDirsResponse .

Thanks,
Dong




Virus-free.
www.avast.com

<#DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>

On Wed, Aug 9, 2017 at 5:06 PM, Jun Rao  wrote:

> Hi, Dong,
>
> Yes, the lag in a replica is calculated as the difference of LEO of the
> replica and the HW. So, as long as a replica is in sync, the lag is always
> 0.
>
> So, I was suggesting to return lag instead of LEO in DescribeDirsResponse
> for each replica. I am not sure if we need to return HW though.
>
> Thanks,
>
> Jun
>
> On Wed, Aug 9, 2017 at 5:01 PM, Dong Lin  wrote:
>
> > Hey Jun,
> >
> > It just came to me that you may be assuming that folower_lag = HW -
> > follower_LEO. If that is the case, then we need to have new
> > request/response to retrieve this lag since the DescribeDirsResponse
> > doesn't even include HW. It seems that KIP-179 does not explicitly
> specify
> > the definition of this lag.
> >
> > I have been assuming that follow_lag = leader_LEO - follower_LEO given
> that
> > the request is used to query the reassignment status. Strictly speaking
> the
> > difference between leader_LEO and the HW is limited by the amount of data
> > produced in KafkaConfig.replicaLagTimeMaxMs, which is 10 seconds. I also
> > assumed that 10 seconds is probably not a big deal given the typical time
> > length of the reassignment.
> >
> > Thanks,
> > Dong
> >
> > On Wed, Aug 9, 2017 at 4:40 PM, Dong Lin  wrote:
> >
> > > Hey Jun,
> > >
> > > If I understand you right, you are suggesting that, in the case when
> > there
> > > is continuous incoming traffic, the approach in the KIP-179 will report
> > lag
> > > as 0 whereas the approach using DescribeDirsRequest will report lag as
> > > non-zero. But I think the approach in KIP-179 will also report non-zero
> > lag
> > > when there is continuous traffic. This is because at the time the
> leader
> > > receives ReplicaStatusRequest, it is likely that some data has been
> > > appended to the partition after the last FetchRequest from the
> follower.
> > > Does this make sense?
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > >
> > > On Wed, Aug 9, 2017 at 4:24 PM, Jun Rao  wrote:
> > >
> > >> Hi, Dong,
> > >>
> > >> As for whether to return LEO or lag, my point was the following. What
> > you
> > >> are concerned about is that an in-sync replica could become out of
> sync
> > >> again. However, the more common case is that once a replica is caught
> > up,
> > >> it will stay in sync afterwards. In that case, once the reassignment
> > >> process completes, if we report based on lag, all lags will be 0. If
> we
> > >> report based on Math.max(0, leaderLEO - followerLEO), the value may
> not
> > be
> > >> 0 if there is continuous incoming traffic, which will be confusing to
> > the
> > >> user.
> > >>
> > >> Thanks,
> > >>
> > >> Jun
> > >>
> > >>
> > >>
> > >> On Tue, Aug 8, 2017 at 6:26 PM, Dong Lin  wrote:
> > >>
> > >> > Hey Jun,
> > >> >
> > >> > Thanks for the comment!
> > >> >
> > >> > Yes, it should work. The tool can send request to any broker and
> > broker
> > >> can
> > >> > just write the reassignment znode. My previous intuition is that it
> > may
> > >> be
> > >> > better to only send this request to controller. But I don't have
> good
> > >> > reasons for this restriction.
> > >> >
> > >> > My intuition is that we can keep them separate as well. Becket and I
> > >> have
> > >> > discussed this both offline and in https://github.com/apache/
> > >> > kafka/pull/3621.
> > >> > Currently I don't have a strong opinion on this and I am open to
> using
> > >> only
> > >> > one API to do both if someone can come up with a reasonable API
> > >> signature
> > >> > for this method. For now I have added the method alterReplicaDir()
> in
> > >> > KafkaAdminClient instead of the AdminClient interface so that the
> > >> > reassignment script can use this method without concluding what the
> > API
> > >> > would look like in AdminClient in the future.
> > >> >
> > >> > Regarding DescribeDirsResponse, I think it is probably OK to have
> > >> 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-09 Thread Jun Rao
Hi, Dong,

Yes, the lag in a replica is calculated as the difference of LEO of the
replica and the HW. So, as long as a replica is in sync, the lag is always
0.

So, I was suggesting to return lag instead of LEO in DescribeDirsResponse
for each replica. I am not sure if we need to return HW though.

Thanks,

Jun

On Wed, Aug 9, 2017 at 5:01 PM, Dong Lin  wrote:

> Hey Jun,
>
> It just came to me that you may be assuming that folower_lag = HW -
> follower_LEO. If that is the case, then we need to have new
> request/response to retrieve this lag since the DescribeDirsResponse
> doesn't even include HW. It seems that KIP-179 does not explicitly specify
> the definition of this lag.
>
> I have been assuming that follow_lag = leader_LEO - follower_LEO given that
> the request is used to query the reassignment status. Strictly speaking the
> difference between leader_LEO and the HW is limited by the amount of data
> produced in KafkaConfig.replicaLagTimeMaxMs, which is 10 seconds. I also
> assumed that 10 seconds is probably not a big deal given the typical time
> length of the reassignment.
>
> Thanks,
> Dong
>
> On Wed, Aug 9, 2017 at 4:40 PM, Dong Lin  wrote:
>
> > Hey Jun,
> >
> > If I understand you right, you are suggesting that, in the case when
> there
> > is continuous incoming traffic, the approach in the KIP-179 will report
> lag
> > as 0 whereas the approach using DescribeDirsRequest will report lag as
> > non-zero. But I think the approach in KIP-179 will also report non-zero
> lag
> > when there is continuous traffic. This is because at the time the leader
> > receives ReplicaStatusRequest, it is likely that some data has been
> > appended to the partition after the last FetchRequest from the follower.
> > Does this make sense?
> >
> > Thanks,
> > Dong
> >
> >
> >
> > On Wed, Aug 9, 2017 at 4:24 PM, Jun Rao  wrote:
> >
> >> Hi, Dong,
> >>
> >> As for whether to return LEO or lag, my point was the following. What
> you
> >> are concerned about is that an in-sync replica could become out of sync
> >> again. However, the more common case is that once a replica is caught
> up,
> >> it will stay in sync afterwards. In that case, once the reassignment
> >> process completes, if we report based on lag, all lags will be 0. If we
> >> report based on Math.max(0, leaderLEO - followerLEO), the value may not
> be
> >> 0 if there is continuous incoming traffic, which will be confusing to
> the
> >> user.
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >>
> >> On Tue, Aug 8, 2017 at 6:26 PM, Dong Lin  wrote:
> >>
> >> > Hey Jun,
> >> >
> >> > Thanks for the comment!
> >> >
> >> > Yes, it should work. The tool can send request to any broker and
> broker
> >> can
> >> > just write the reassignment znode. My previous intuition is that it
> may
> >> be
> >> > better to only send this request to controller. But I don't have good
> >> > reasons for this restriction.
> >> >
> >> > My intuition is that we can keep them separate as well. Becket and I
> >> have
> >> > discussed this both offline and in https://github.com/apache/
> >> > kafka/pull/3621.
> >> > Currently I don't have a strong opinion on this and I am open to using
> >> only
> >> > one API to do both if someone can come up with a reasonable API
> >> signature
> >> > for this method. For now I have added the method alterReplicaDir() in
> >> > KafkaAdminClient instead of the AdminClient interface so that the
> >> > reassignment script can use this method without concluding what the
> API
> >> > would look like in AdminClient in the future.
> >> >
> >> > Regarding DescribeDirsResponse, I think it is probably OK to have
> >> slightly
> >> > more lag. The script can calculate the lag of the follower replica as
> >> > Math.max(0, leaderLEO - followerLEO). I agree that it will be slightly
> >> less
> >> > accurate than the current approach in KIP-179. But even with the
> current
> >> > approach in KIP-179, the result provided by the script is an
> >> approximation
> >> > anyway, since there is delay from the time that leader returns
> response
> >> to
> >> > the time that the script collects response from all brokers and prints
> >> > result to user. I think if the slight difference in the accuracy
> between
> >> > the two approaches does not make a difference to the intended use-case
> >> of
> >> > this API, then we probably want to re-use the exiting request/response
> >> to
> >> > keep the protocol simple.
> >> >
> >> > Thanks,
> >> > Dong
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > On Tue, Aug 8, 2017 at 5:56 PM, Jun Rao  wrote:
> >> >
> >> > > Hi, Dong,
> >> > >
> >> > > I think Tom was suggesting to have the AlterTopicsRequest sent to
> any
> >> > > broker, which just writes the reassignment json to ZK. The
> controller
> >> > will
> >> > > pick up the reassignment and act on it as usual. This should work,
> >> right?
> >> > >
> >> > > Having a separate 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-09 Thread Dong Lin
Hey Jun,

It just came to me that you may be assuming that folower_lag = HW -
follower_LEO. If that is the case, then we need to have new
request/response to retrieve this lag since the DescribeDirsResponse
doesn't even include HW. It seems that KIP-179 does not explicitly specify
the definition of this lag.

I have been assuming that follow_lag = leader_LEO - follower_LEO given that
the request is used to query the reassignment status. Strictly speaking the
difference between leader_LEO and the HW is limited by the amount of data
produced in KafkaConfig.replicaLagTimeMaxMs, which is 10 seconds. I also
assumed that 10 seconds is probably not a big deal given the typical time
length of the reassignment.

Thanks,
Dong

On Wed, Aug 9, 2017 at 4:40 PM, Dong Lin  wrote:

> Hey Jun,
>
> If I understand you right, you are suggesting that, in the case when there
> is continuous incoming traffic, the approach in the KIP-179 will report lag
> as 0 whereas the approach using DescribeDirsRequest will report lag as
> non-zero. But I think the approach in KIP-179 will also report non-zero lag
> when there is continuous traffic. This is because at the time the leader
> receives ReplicaStatusRequest, it is likely that some data has been
> appended to the partition after the last FetchRequest from the follower.
> Does this make sense?
>
> Thanks,
> Dong
>
>
>
> On Wed, Aug 9, 2017 at 4:24 PM, Jun Rao  wrote:
>
>> Hi, Dong,
>>
>> As for whether to return LEO or lag, my point was the following. What you
>> are concerned about is that an in-sync replica could become out of sync
>> again. However, the more common case is that once a replica is caught up,
>> it will stay in sync afterwards. In that case, once the reassignment
>> process completes, if we report based on lag, all lags will be 0. If we
>> report based on Math.max(0, leaderLEO - followerLEO), the value may not be
>> 0 if there is continuous incoming traffic, which will be confusing to the
>> user.
>>
>> Thanks,
>>
>> Jun
>>
>>
>>
>> On Tue, Aug 8, 2017 at 6:26 PM, Dong Lin  wrote:
>>
>> > Hey Jun,
>> >
>> > Thanks for the comment!
>> >
>> > Yes, it should work. The tool can send request to any broker and broker
>> can
>> > just write the reassignment znode. My previous intuition is that it may
>> be
>> > better to only send this request to controller. But I don't have good
>> > reasons for this restriction.
>> >
>> > My intuition is that we can keep them separate as well. Becket and I
>> have
>> > discussed this both offline and in https://github.com/apache/
>> > kafka/pull/3621.
>> > Currently I don't have a strong opinion on this and I am open to using
>> only
>> > one API to do both if someone can come up with a reasonable API
>> signature
>> > for this method. For now I have added the method alterReplicaDir() in
>> > KafkaAdminClient instead of the AdminClient interface so that the
>> > reassignment script can use this method without concluding what the API
>> > would look like in AdminClient in the future.
>> >
>> > Regarding DescribeDirsResponse, I think it is probably OK to have
>> slightly
>> > more lag. The script can calculate the lag of the follower replica as
>> > Math.max(0, leaderLEO - followerLEO). I agree that it will be slightly
>> less
>> > accurate than the current approach in KIP-179. But even with the current
>> > approach in KIP-179, the result provided by the script is an
>> approximation
>> > anyway, since there is delay from the time that leader returns response
>> to
>> > the time that the script collects response from all brokers and prints
>> > result to user. I think if the slight difference in the accuracy between
>> > the two approaches does not make a difference to the intended use-case
>> of
>> > this API, then we probably want to re-use the exiting request/response
>> to
>> > keep the protocol simple.
>> >
>> > Thanks,
>> > Dong
>> >
>> >
>> >
>> >
>> >
>> > On Tue, Aug 8, 2017 at 5:56 PM, Jun Rao  wrote:
>> >
>> > > Hi, Dong,
>> > >
>> > > I think Tom was suggesting to have the AlterTopicsRequest sent to any
>> > > broker, which just writes the reassignment json to ZK. The controller
>> > will
>> > > pick up the reassignment and act on it as usual. This should work,
>> right?
>> > >
>> > > Having a separate AlterTopicsRequest and AlterReplicaDirRequest seems
>> > > simpler to me. The former is handled by the controller and the latter
>> is
>> > > handled by the affected broker. They don't always have to be done
>> > together.
>> > > Merging the two into a single request probably will make both the api
>> and
>> > > the implementation a bit more complicated. If we do keep the two
>> separate
>> > > requests, it seems that we should just add AlterReplicaDirRequest to
>> the
>> > > AdminClient interface?
>> > >
>> > > Now, regarding DescribeDirsResponse. I agree that it can be used for
>> the
>> > > status reporting in KIP-179 as well. However, it seems 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-09 Thread Dong Lin
Hey Jun,

If I understand you right, you are suggesting that, in the case when there
is continuous incoming traffic, the approach in the KIP-179 will report lag
as 0 whereas the approach using DescribeDirsRequest will report lag as
non-zero. But I think the approach in KIP-179 will also report non-zero lag
when there is continuous traffic. This is because at the time the leader
receives ReplicaStatusRequest, it is likely that some data has been
appended to the partition after the last FetchRequest from the follower.
Does this make sense?

Thanks,
Dong



On Wed, Aug 9, 2017 at 4:24 PM, Jun Rao  wrote:

> Hi, Dong,
>
> As for whether to return LEO or lag, my point was the following. What you
> are concerned about is that an in-sync replica could become out of sync
> again. However, the more common case is that once a replica is caught up,
> it will stay in sync afterwards. In that case, once the reassignment
> process completes, if we report based on lag, all lags will be 0. If we
> report based on Math.max(0, leaderLEO - followerLEO), the value may not be
> 0 if there is continuous incoming traffic, which will be confusing to the
> user.
>
> Thanks,
>
> Jun
>
>
>
> On Tue, Aug 8, 2017 at 6:26 PM, Dong Lin  wrote:
>
> > Hey Jun,
> >
> > Thanks for the comment!
> >
> > Yes, it should work. The tool can send request to any broker and broker
> can
> > just write the reassignment znode. My previous intuition is that it may
> be
> > better to only send this request to controller. But I don't have good
> > reasons for this restriction.
> >
> > My intuition is that we can keep them separate as well. Becket and I have
> > discussed this both offline and in https://github.com/apache/
> > kafka/pull/3621.
> > Currently I don't have a strong opinion on this and I am open to using
> only
> > one API to do both if someone can come up with a reasonable API signature
> > for this method. For now I have added the method alterReplicaDir() in
> > KafkaAdminClient instead of the AdminClient interface so that the
> > reassignment script can use this method without concluding what the API
> > would look like in AdminClient in the future.
> >
> > Regarding DescribeDirsResponse, I think it is probably OK to have
> slightly
> > more lag. The script can calculate the lag of the follower replica as
> > Math.max(0, leaderLEO - followerLEO). I agree that it will be slightly
> less
> > accurate than the current approach in KIP-179. But even with the current
> > approach in KIP-179, the result provided by the script is an
> approximation
> > anyway, since there is delay from the time that leader returns response
> to
> > the time that the script collects response from all brokers and prints
> > result to user. I think if the slight difference in the accuracy between
> > the two approaches does not make a difference to the intended use-case of
> > this API, then we probably want to re-use the exiting request/response to
> > keep the protocol simple.
> >
> > Thanks,
> > Dong
> >
> >
> >
> >
> >
> > On Tue, Aug 8, 2017 at 5:56 PM, Jun Rao  wrote:
> >
> > > Hi, Dong,
> > >
> > > I think Tom was suggesting to have the AlterTopicsRequest sent to any
> > > broker, which just writes the reassignment json to ZK. The controller
> > will
> > > pick up the reassignment and act on it as usual. This should work,
> right?
> > >
> > > Having a separate AlterTopicsRequest and AlterReplicaDirRequest seems
> > > simpler to me. The former is handled by the controller and the latter
> is
> > > handled by the affected broker. They don't always have to be done
> > together.
> > > Merging the two into a single request probably will make both the api
> and
> > > the implementation a bit more complicated. If we do keep the two
> separate
> > > requests, it seems that we should just add AlterReplicaDirRequest to
> the
> > > AdminClient interface?
> > >
> > > Now, regarding DescribeDirsResponse. I agree that it can be used for
> the
> > > status reporting in KIP-179 as well. However, it seems that reporting
> the
> > > log end offset of each replica may not be easy to use. The log end
> offset
> > > will be returned from different brokers in slightly different time. If
> > > there is continuous producing traffic, the difference in log end offset
> > > between the leader and the follower could be larger than 0 even if the
> > > follower has fully caught up. I am wondering if it's better to instead
> > > return the lag in offset per replica. This way, the status can probably
> > be
> > > reported more reliably.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Tue, Aug 8, 2017 at 11:23 AM, Dong Lin  wrote:
> > >
> > > > Hey Tom,
> > > >
> > > > Thanks for the quick reply. Please see my comment inline.
> > > >
> > > > On Tue, Aug 8, 2017 at 11:06 AM, Tom Bentley 
> > > > wrote:
> > > >
> > > > > Hi Dong,
> > > > >
> > > > > Replies inline, as usual
> > > > >
> > > 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-09 Thread Jun Rao
Hi, Dong,

As for whether to return LEO or lag, my point was the following. What you
are concerned about is that an in-sync replica could become out of sync
again. However, the more common case is that once a replica is caught up,
it will stay in sync afterwards. In that case, once the reassignment
process completes, if we report based on lag, all lags will be 0. If we
report based on Math.max(0, leaderLEO - followerLEO), the value may not be
0 if there is continuous incoming traffic, which will be confusing to the
user.

Thanks,

Jun



On Tue, Aug 8, 2017 at 6:26 PM, Dong Lin  wrote:

> Hey Jun,
>
> Thanks for the comment!
>
> Yes, it should work. The tool can send request to any broker and broker can
> just write the reassignment znode. My previous intuition is that it may be
> better to only send this request to controller. But I don't have good
> reasons for this restriction.
>
> My intuition is that we can keep them separate as well. Becket and I have
> discussed this both offline and in https://github.com/apache/
> kafka/pull/3621.
> Currently I don't have a strong opinion on this and I am open to using only
> one API to do both if someone can come up with a reasonable API signature
> for this method. For now I have added the method alterReplicaDir() in
> KafkaAdminClient instead of the AdminClient interface so that the
> reassignment script can use this method without concluding what the API
> would look like in AdminClient in the future.
>
> Regarding DescribeDirsResponse, I think it is probably OK to have slightly
> more lag. The script can calculate the lag of the follower replica as
> Math.max(0, leaderLEO - followerLEO). I agree that it will be slightly less
> accurate than the current approach in KIP-179. But even with the current
> approach in KIP-179, the result provided by the script is an approximation
> anyway, since there is delay from the time that leader returns response to
> the time that the script collects response from all brokers and prints
> result to user. I think if the slight difference in the accuracy between
> the two approaches does not make a difference to the intended use-case of
> this API, then we probably want to re-use the exiting request/response to
> keep the protocol simple.
>
> Thanks,
> Dong
>
>
>
>
>
> On Tue, Aug 8, 2017 at 5:56 PM, Jun Rao  wrote:
>
> > Hi, Dong,
> >
> > I think Tom was suggesting to have the AlterTopicsRequest sent to any
> > broker, which just writes the reassignment json to ZK. The controller
> will
> > pick up the reassignment and act on it as usual. This should work, right?
> >
> > Having a separate AlterTopicsRequest and AlterReplicaDirRequest seems
> > simpler to me. The former is handled by the controller and the latter is
> > handled by the affected broker. They don't always have to be done
> together.
> > Merging the two into a single request probably will make both the api and
> > the implementation a bit more complicated. If we do keep the two separate
> > requests, it seems that we should just add AlterReplicaDirRequest to the
> > AdminClient interface?
> >
> > Now, regarding DescribeDirsResponse. I agree that it can be used for the
> > status reporting in KIP-179 as well. However, it seems that reporting the
> > log end offset of each replica may not be easy to use. The log end offset
> > will be returned from different brokers in slightly different time. If
> > there is continuous producing traffic, the difference in log end offset
> > between the leader and the follower could be larger than 0 even if the
> > follower has fully caught up. I am wondering if it's better to instead
> > return the lag in offset per replica. This way, the status can probably
> be
> > reported more reliably.
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Aug 8, 2017 at 11:23 AM, Dong Lin  wrote:
> >
> > > Hey Tom,
> > >
> > > Thanks for the quick reply. Please see my comment inline.
> > >
> > > On Tue, Aug 8, 2017 at 11:06 AM, Tom Bentley 
> > > wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > Replies inline, as usual
> > > >
> > > > > As I originally envisaged it, KIP-179's support for reassigning
> > > > partitions
> > > > >
> > > > > would have more-or-less taken the logic currently in the
> > > > > > ReassignPartitionsCommand (that is, writing JSON to the
> > > > > > ZkUtils.ReassignPartitionsPath)
> > > > > > and put it behind a suitable network protocol API. Thus it
> wouldn't
> > > > > matter
> > > > > > which broker received the protocol call: It would be acted on by
> > > > brokers
> > > > > > being notified of the change in the ZK path, just as currently.
> > This
> > > > > would
> > > > > > have kept the ReassignPartitionsCommand relatively simple, as it
> > > > > currently
> > > > > > is.
> > > > > >
> > > > >
> > > > > I am not sure I fully understand your proposal. I think you are
> > saying
> > > > that
> > > > > any broker can receive and handle the 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-09 Thread Tom Bentley
Hi Dong and Jun,

Thanks for your responses!

Jun's interpretation of how AlterTopicsRequest could be sent to any broker
is indeed what I meant. Since the data has to get persisted in ZK anyway,
it doesn't really matter whether we send it to the controller (it will will
have to write it to the znode). And while we still support --zookeeper the
controller will have to remain a listener to that znode anyway. Requiring
the AlterTopicsRequest to be sent to the controller might make sense if we
can foresee some way to take ZK out of the equation in the future (a la
KIP-183). But I think we have to keep ZK in the picture to make it
resilient, and therefore I see no value in requiring the receiver of the
AlterTopicsRequest to be the controller.

I will have a go at putting a "unified API" (for reassigning partitions
between brokers and to particular log directories), so we have something
concrete to discuss, though we may well conclude separate APIs make more
sense.

Finally about measuring progress, Dong said:

I think if the slight difference in the accuracy between
> the two approaches does not make a difference to the intended use-case of
> this API


Lacking data to evaluate the "if", I guess we could go with
DescribeDirsResponse and change it in a future KIP if it turned out to be
inadequate. But if anyone is able to give insight into what the difference
is, that would be better.

Thanks again for the feedback.


On 9 August 2017 at 02:26, Dong Lin  wrote:

> Hey Jun,
>
> Thanks for the comment!
>
> Yes, it should work. The tool can send request to any broker and broker can
> just write the reassignment znode. My previous intuition is that it may be
> better to only send this request to controller. But I don't have good
> reasons for this restriction.
>
> My intuition is that we can keep them separate as well. Becket and I have
> discussed this both offline and in https://github.com/apache/
> kafka/pull/3621.
> Currently I don't have a strong opinion on this and I am open to using only
> one API to do both if someone can come up with a reasonable API signature
> for this method. For now I have added the method alterReplicaDir() in
> KafkaAdminClient instead of the AdminClient interface so that the
> reassignment script can use this method without concluding what the API
> would look like in AdminClient in the future.
>
> Regarding DescribeDirsResponse, I think it is probably OK to have slightly
> more lag. The script can calculate the lag of the follower replica as
> Math.max(0, leaderLEO - followerLEO). I agree that it will be slightly less
> accurate than the current approach in KIP-179. But even with the current
> approach in KIP-179, the result provided by the script is an approximation
> anyway, since there is delay from the time that leader returns response to
> the time that the script collects response from all brokers and prints
> result to user. I think if the slight difference in the accuracy between
> the two approaches does not make a difference to the intended use-case of
> this API, then we probably want to re-use the exiting request/response to
> keep the protocol simple.
>
> Thanks,
> Dong
>
>
>
>
>
> On Tue, Aug 8, 2017 at 5:56 PM, Jun Rao  wrote:
>
> > Hi, Dong,
> >
> > I think Tom was suggesting to have the AlterTopicsRequest sent to any
> > broker, which just writes the reassignment json to ZK. The controller
> will
> > pick up the reassignment and act on it as usual. This should work, right?
> >
> > Having a separate AlterTopicsRequest and AlterReplicaDirRequest seems
> > simpler to me. The former is handled by the controller and the latter is
> > handled by the affected broker. They don't always have to be done
> together.
> > Merging the two into a single request probably will make both the api and
> > the implementation a bit more complicated. If we do keep the two separate
> > requests, it seems that we should just add AlterReplicaDirRequest to the
> > AdminClient interface?
> >
> > Now, regarding DescribeDirsResponse. I agree that it can be used for the
> > status reporting in KIP-179 as well. However, it seems that reporting the
> > log end offset of each replica may not be easy to use. The log end offset
> > will be returned from different brokers in slightly different time. If
> > there is continuous producing traffic, the difference in log end offset
> > between the leader and the follower could be larger than 0 even if the
> > follower has fully caught up. I am wondering if it's better to instead
> > return the lag in offset per replica. This way, the status can probably
> be
> > reported more reliably.
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Aug 8, 2017 at 11:23 AM, Dong Lin  wrote:
> >
> > > Hey Tom,
> > >
> > > Thanks for the quick reply. Please see my comment inline.
> > >
> > > On Tue, Aug 8, 2017 at 11:06 AM, Tom Bentley 
> > > wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > Replies 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-08 Thread Dong Lin
Hey Jun,

Thanks for the comment!

Yes, it should work. The tool can send request to any broker and broker can
just write the reassignment znode. My previous intuition is that it may be
better to only send this request to controller. But I don't have good
reasons for this restriction.

My intuition is that we can keep them separate as well. Becket and I have
discussed this both offline and in https://github.com/apache/kafka/pull/3621.
Currently I don't have a strong opinion on this and I am open to using only
one API to do both if someone can come up with a reasonable API signature
for this method. For now I have added the method alterReplicaDir() in
KafkaAdminClient instead of the AdminClient interface so that the
reassignment script can use this method without concluding what the API
would look like in AdminClient in the future.

Regarding DescribeDirsResponse, I think it is probably OK to have slightly
more lag. The script can calculate the lag of the follower replica as
Math.max(0, leaderLEO - followerLEO). I agree that it will be slightly less
accurate than the current approach in KIP-179. But even with the current
approach in KIP-179, the result provided by the script is an approximation
anyway, since there is delay from the time that leader returns response to
the time that the script collects response from all brokers and prints
result to user. I think if the slight difference in the accuracy between
the two approaches does not make a difference to the intended use-case of
this API, then we probably want to re-use the exiting request/response to
keep the protocol simple.

Thanks,
Dong





On Tue, Aug 8, 2017 at 5:56 PM, Jun Rao  wrote:

> Hi, Dong,
>
> I think Tom was suggesting to have the AlterTopicsRequest sent to any
> broker, which just writes the reassignment json to ZK. The controller will
> pick up the reassignment and act on it as usual. This should work, right?
>
> Having a separate AlterTopicsRequest and AlterReplicaDirRequest seems
> simpler to me. The former is handled by the controller and the latter is
> handled by the affected broker. They don't always have to be done together.
> Merging the two into a single request probably will make both the api and
> the implementation a bit more complicated. If we do keep the two separate
> requests, it seems that we should just add AlterReplicaDirRequest to the
> AdminClient interface?
>
> Now, regarding DescribeDirsResponse. I agree that it can be used for the
> status reporting in KIP-179 as well. However, it seems that reporting the
> log end offset of each replica may not be easy to use. The log end offset
> will be returned from different brokers in slightly different time. If
> there is continuous producing traffic, the difference in log end offset
> between the leader and the follower could be larger than 0 even if the
> follower has fully caught up. I am wondering if it's better to instead
> return the lag in offset per replica. This way, the status can probably be
> reported more reliably.
>
> Thanks,
>
> Jun
>
> On Tue, Aug 8, 2017 at 11:23 AM, Dong Lin  wrote:
>
> > Hey Tom,
> >
> > Thanks for the quick reply. Please see my comment inline.
> >
> > On Tue, Aug 8, 2017 at 11:06 AM, Tom Bentley 
> > wrote:
> >
> > > Hi Dong,
> > >
> > > Replies inline, as usual
> > >
> > > > As I originally envisaged it, KIP-179's support for reassigning
> > > partitions
> > > >
> > > > would have more-or-less taken the logic currently in the
> > > > > ReassignPartitionsCommand (that is, writing JSON to the
> > > > > ZkUtils.ReassignPartitionsPath)
> > > > > and put it behind a suitable network protocol API. Thus it wouldn't
> > > > matter
> > > > > which broker received the protocol call: It would be acted on by
> > > brokers
> > > > > being notified of the change in the ZK path, just as currently.
> This
> > > > would
> > > > > have kept the ReassignPartitionsCommand relatively simple, as it
> > > > currently
> > > > > is.
> > > > >
> > > >
> > > > I am not sure I fully understand your proposal. I think you are
> saying
> > > that
> > > > any broker can receive and handle the AlterTopicRequest.
> > >
> > >
> > > That's right.
> > >
> > >
> > > > Let's say a
> > > > non-controller broker received AlterTopicRequest, is this broker
> going
> > to
> > > > send LeaderAndIsrRequest to other brokers? Or is this broker create
> the
> > > > reassignment znode in zookeper?
> > >
> > >
> > > Exactly: It's going to write some JSON to the relevant znode. Other
> > brokers
> > > will get notified by zk when the contents of this znode changes, and do
> > as
> > > they do now. This is what the tool/script does now.
> > >
> > > I will confess that I don't completely understand the role of
> > > LeaderAndIsrRequest, since the current code just seems to write to the
> > > znode do get the brokers to do the reassignment. If you could explain
> the
> > > role of LeaderAndIsrRequest that would be great.
> > 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-08 Thread Jun Rao
Hi, Dong,

I think Tom was suggesting to have the AlterTopicsRequest sent to any
broker, which just writes the reassignment json to ZK. The controller will
pick up the reassignment and act on it as usual. This should work, right?

Having a separate AlterTopicsRequest and AlterReplicaDirRequest seems
simpler to me. The former is handled by the controller and the latter is
handled by the affected broker. They don't always have to be done together.
Merging the two into a single request probably will make both the api and
the implementation a bit more complicated. If we do keep the two separate
requests, it seems that we should just add AlterReplicaDirRequest to the
AdminClient interface?

Now, regarding DescribeDirsResponse. I agree that it can be used for the
status reporting in KIP-179 as well. However, it seems that reporting the
log end offset of each replica may not be easy to use. The log end offset
will be returned from different brokers in slightly different time. If
there is continuous producing traffic, the difference in log end offset
between the leader and the follower could be larger than 0 even if the
follower has fully caught up. I am wondering if it's better to instead
return the lag in offset per replica. This way, the status can probably be
reported more reliably.

Thanks,

Jun

On Tue, Aug 8, 2017 at 11:23 AM, Dong Lin  wrote:

> Hey Tom,
>
> Thanks for the quick reply. Please see my comment inline.
>
> On Tue, Aug 8, 2017 at 11:06 AM, Tom Bentley 
> wrote:
>
> > Hi Dong,
> >
> > Replies inline, as usual
> >
> > > As I originally envisaged it, KIP-179's support for reassigning
> > partitions
> > >
> > > would have more-or-less taken the logic currently in the
> > > > ReassignPartitionsCommand (that is, writing JSON to the
> > > > ZkUtils.ReassignPartitionsPath)
> > > > and put it behind a suitable network protocol API. Thus it wouldn't
> > > matter
> > > > which broker received the protocol call: It would be acted on by
> > brokers
> > > > being notified of the change in the ZK path, just as currently. This
> > > would
> > > > have kept the ReassignPartitionsCommand relatively simple, as it
> > > currently
> > > > is.
> > > >
> > >
> > > I am not sure I fully understand your proposal. I think you are saying
> > that
> > > any broker can receive and handle the AlterTopicRequest.
> >
> >
> > That's right.
> >
> >
> > > Let's say a
> > > non-controller broker received AlterTopicRequest, is this broker going
> to
> > > send LeaderAndIsrRequest to other brokers? Or is this broker create the
> > > reassignment znode in zookeper?
> >
> >
> > Exactly: It's going to write some JSON to the relevant znode. Other
> brokers
> > will get notified by zk when the contents of this znode changes, and do
> as
> > they do now. This is what the tool/script does now.
> >
> > I will confess that I don't completely understand the role of
> > LeaderAndIsrRequest, since the current code just seems to write to the
> > znode do get the brokers to do the reassignment. If you could explain the
> > role of LeaderAndIsrRequest that would be great.
> >
>
> Currently only the controller will listen to the reassignment znode and
> sends LeaderAndIsrRequest and StopReplicaRequest to brokers in order to
> complete reassignment. Brokers won't need to listen to zookeeper for any
> reassignment -- brokers only reacts to the request from controller.
> Currently Kafka's design replies a lot on the controller to keep a
> consistent view of who are the leader of partitions and what is the ISR
> etc. It will be a pretty drastic change, if not impossible, for the script
> to reassign partitions without going through controller.
>
> Thus I think it is likely that your AlterTopicsRequest can only be sent to
> controller. Then the controller can create the reassignment znode in
> zookeeper so that the information is persisted across controller fail over.
> I haven't think through this in detail though.
>
>
>
> >
> >
> > > I may have missed it. But I couldn't find
> > > the explanation of AlterTopicRequest handling in KIP-179.
> > >
> >
> > You're right, it doesn't go into that much detail. I will fix that.
> >
> >
> > > >
> > > > KIP-113 is obviously seeking to make more radical changes. The
> > algorithm
> > > > described for moving a replica to a particular directory on a
> different
> > > > broker (
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 113%3A+Support+replicas+movement+between+log+directories#KIP-113:
> > > > Supportreplicasmovementbetweenlogdirectories-2)
> > > > Howtoreassignreplicabetweenlogdirectoriesacrossbrokers
> > > >  > > > 113%3A+Support+replicas+movement+between+log+directories#KIP-113:
> > > > Supportreplicasmovementbetweenlogdirectories-2%
> > > > 29Howtoreassignreplicabetweenlogdirectoriesacrossbrokers>)
> > > > involves both sending AlterReplicaDirRequest to "the" broker (the
> > > 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-08 Thread Dong Lin
Hey Tom,

Thanks for the quick reply. Please see my comment inline.

On Tue, Aug 8, 2017 at 11:06 AM, Tom Bentley  wrote:

> Hi Dong,
>
> Replies inline, as usual
>
> > As I originally envisaged it, KIP-179's support for reassigning
> partitions
> >
> > would have more-or-less taken the logic currently in the
> > > ReassignPartitionsCommand (that is, writing JSON to the
> > > ZkUtils.ReassignPartitionsPath)
> > > and put it behind a suitable network protocol API. Thus it wouldn't
> > matter
> > > which broker received the protocol call: It would be acted on by
> brokers
> > > being notified of the change in the ZK path, just as currently. This
> > would
> > > have kept the ReassignPartitionsCommand relatively simple, as it
> > currently
> > > is.
> > >
> >
> > I am not sure I fully understand your proposal. I think you are saying
> that
> > any broker can receive and handle the AlterTopicRequest.
>
>
> That's right.
>
>
> > Let's say a
> > non-controller broker received AlterTopicRequest, is this broker going to
> > send LeaderAndIsrRequest to other brokers? Or is this broker create the
> > reassignment znode in zookeper?
>
>
> Exactly: It's going to write some JSON to the relevant znode. Other brokers
> will get notified by zk when the contents of this znode changes, and do as
> they do now. This is what the tool/script does now.
>
> I will confess that I don't completely understand the role of
> LeaderAndIsrRequest, since the current code just seems to write to the
> znode do get the brokers to do the reassignment. If you could explain the
> role of LeaderAndIsrRequest that would be great.
>

Currently only the controller will listen to the reassignment znode and
sends LeaderAndIsrRequest and StopReplicaRequest to brokers in order to
complete reassignment. Brokers won't need to listen to zookeeper for any
reassignment -- brokers only reacts to the request from controller.
Currently Kafka's design replies a lot on the controller to keep a
consistent view of who are the leader of partitions and what is the ISR
etc. It will be a pretty drastic change, if not impossible, for the script
to reassign partitions without going through controller.

Thus I think it is likely that your AlterTopicsRequest can only be sent to
controller. Then the controller can create the reassignment znode in
zookeeper so that the information is persisted across controller fail over.
I haven't think through this in detail though.



>
>
> > I may have missed it. But I couldn't find
> > the explanation of AlterTopicRequest handling in KIP-179.
> >
>
> You're right, it doesn't go into that much detail. I will fix that.
>
>
> > >
> > > KIP-113 is obviously seeking to make more radical changes. The
> algorithm
> > > described for moving a replica to a particular directory on a different
> > > broker (
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 113%3A+Support+replicas+movement+between+log+directories#KIP-113:
> > > Supportreplicasmovementbetweenlogdirectories-2)
> > > Howtoreassignreplicabetweenlogdirectoriesacrossbrokers
> > >  > > 113%3A+Support+replicas+movement+between+log+directories#KIP-113:
> > > Supportreplicasmovementbetweenlogdirectories-2%
> > > 29Howtoreassignreplicabetweenlogdirectoriesacrossbrokers>)
> > > involves both sending AlterReplicaDirRequest to "the" broker (the
> > receiving
> > > broker, I assume, but it's not spelled out), _as well as_ writing to
> the
> > ZK
> > > node.
> > >
> > > This assumes the script (ReassignPartitionsCommand) has direct access
> to
> > > ZooKeeper, which is what KIP-179 is seeking to deprecate. It seems a
> > waste
> > > of time to put the logic in the script as part of KIP-113, only for
> > KIP-179
> > > to have to move it back to the controller.
> > >
> >
> > I am not sure I understand what you mean by "It seems a waste of time to
> > put the logic in the script as part of KIP-113, only for KIP-179 to have
> to
> > move it back to the controller".
>
>
> Sorry, I misunderstood slightly what you were proposing in KIP-113, so the
> "waste of time" comment isn't quite right, but I'm still not convinced that
> KIP-113+KIP-179 (in its current form) ends with a satisfactory result.
>
> Let me elaborate... KIP-113 says that to support reassigning replica
> between log directories across brokers:
> * ...
> * The script sends AlterReplicaDirRequest to those brokers which need to
> move replicas...
> * The script creates reassignment znode in zookeeper.
> * The script retries AlterReplicaDirRequest to those broker...
> * ...
>
> So the ReassignPartitionsCommand still talks to ZK directly, but now it's
> bracketed by calls to the AdminClient. KIP-179 could replace that talking
> to ZK directly with a new call to the AdminClient. But then we've got a
> pretty weird API, where we have to make three AdminClient calls (two of
> them to the same method), to move a replica. I don't really understand 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-08 Thread Tom Bentley
Hi Dong,

Replies inline, as usual

> As I originally envisaged it, KIP-179's support for reassigning partitions
>
> would have more-or-less taken the logic currently in the
> > ReassignPartitionsCommand (that is, writing JSON to the
> > ZkUtils.ReassignPartitionsPath)
> > and put it behind a suitable network protocol API. Thus it wouldn't
> matter
> > which broker received the protocol call: It would be acted on by brokers
> > being notified of the change in the ZK path, just as currently. This
> would
> > have kept the ReassignPartitionsCommand relatively simple, as it
> currently
> > is.
> >
>
> I am not sure I fully understand your proposal. I think you are saying that
> any broker can receive and handle the AlterTopicRequest.


That's right.


> Let's say a
> non-controller broker received AlterTopicRequest, is this broker going to
> send LeaderAndIsrRequest to other brokers? Or is this broker create the
> reassignment znode in zookeper?


Exactly: It's going to write some JSON to the relevant znode. Other brokers
will get notified by zk when the contents of this znode changes, and do as
they do now. This is what the tool/script does now.

I will confess that I don't completely understand the role of
LeaderAndIsrRequest, since the current code just seems to write to the
znode do get the brokers to do the reassignment. If you could explain the
role of LeaderAndIsrRequest that would be great.


> I may have missed it. But I couldn't find
> the explanation of AlterTopicRequest handling in KIP-179.
>

You're right, it doesn't go into that much detail. I will fix that.


> >
> > KIP-113 is obviously seeking to make more radical changes. The algorithm
> > described for moving a replica to a particular directory on a different
> > broker (
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 113%3A+Support+replicas+movement+between+log+directories#KIP-113:
> > Supportreplicasmovementbetweenlogdirectories-2)
> > Howtoreassignreplicabetweenlogdirectoriesacrossbrokers
> >  > 113%3A+Support+replicas+movement+between+log+directories#KIP-113:
> > Supportreplicasmovementbetweenlogdirectories-2%
> > 29Howtoreassignreplicabetweenlogdirectoriesacrossbrokers>)
> > involves both sending AlterReplicaDirRequest to "the" broker (the
> receiving
> > broker, I assume, but it's not spelled out), _as well as_ writing to the
> ZK
> > node.
> >
> > This assumes the script (ReassignPartitionsCommand) has direct access to
> > ZooKeeper, which is what KIP-179 is seeking to deprecate. It seems a
> waste
> > of time to put the logic in the script as part of KIP-113, only for
> KIP-179
> > to have to move it back to the controller.
> >
>
> I am not sure I understand what you mean by "It seems a waste of time to
> put the logic in the script as part of KIP-113, only for KIP-179 to have to
> move it back to the controller".


Sorry, I misunderstood slightly what you were proposing in KIP-113, so the
"waste of time" comment isn't quite right, but I'm still not convinced that
KIP-113+KIP-179 (in its current form) ends with a satisfactory result.

Let me elaborate... KIP-113 says that to support reassigning replica
between log directories across brokers:
* ...
* The script sends AlterReplicaDirRequest to those brokers which need to
move replicas...
* The script creates reassignment znode in zookeeper.
* The script retries AlterReplicaDirRequest to those broker...
* ...

So the ReassignPartitionsCommand still talks to ZK directly, but now it's
bracketed by calls to the AdminClient. KIP-179 could replace that talking
to ZK directly with a new call to the AdminClient. But then we've got a
pretty weird API, where we have to make three AdminClient calls (two of
them to the same method), to move a replica. I don't really understand why
the admin client can't present a single API method to achieve this, and
encapsulate on the server side the careful sequence of events necessary to
coordinate the movement. I understood this position is what Ismael was
advocating when he said it was better to put the logic in the controller
than spread between the script and the controller. But maybe I
misunderstood him.



> I assume that the logic you mentioned is
> "movement of replica to the specified log directory". This logic (or the
> implementation of this logic) resides mainly in the KafkaAdminClient and
> broker. The script only needs to parse the json file as appropriate and
> call the new API in AdminClient as appropriate. The logic in the script is
> therefore not much and can be easily moved to other classes if needed.
>
> Can you clarify why this logic, i.e. movement of replica to the specified
> log directory, needs to be moved to controller in KIP-179? I think it can
> still be done in the script and controller should not need to worry about
> log directory of any replica.
>
> Thanks,
> Dong
>


Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-08 Thread Dong Lin
Thanks for your reply.

Yes, my original idea is that user can continue to collect the static
information for reassignment as they are doing now. It is the status quo. I
agree it can be beneficial to have a tool in Kafka to collect other
information that may be needed for reassignment so that user would not need
external tools at all. We can discuss this later when someone wants to lead
the design and discussion of such a KIP.

I have some questions inline.

On Tue, Aug 8, 2017 at 3:31 AM, Tom Bentley  wrote:

> > >
> > > Also, how do you think things would work in the context of KIP-179?
> Would
> > > the tool still invoke these requests or would it be done by the broker
> > > receiving the alterTopics/reassignPartitions protocol call?
> > >
> >
> > My gut feel is that the tool will still invoke these requests. But I
> have a
> > few questions to KIP-179 before I can answer this question. For example,
> is
> > AlterTopicsRequest request sent to controller only? If the new assignment
> > is not written in zookeeper, how is this information propagated to the
> new
> > controller if the previous controller dies after it receives
> > AlterTopicsRequest but before it sends LeaderAndIsrRequest? I can post
> > these questions in that discussion thread later.
> >
> >
> Let me answer here (though it's relevant to both KIPs):
>
> As I originally envisaged it, KIP-179's support for reassigning partitions
> would have more-or-less taken the logic currently in the
> ReassignPartitionsCommand (that is, writing JSON to the
> ZkUtils.ReassignPartitionsPath)
> and put it behind a suitable network protocol API. Thus it wouldn't matter
> which broker received the protocol call: It would be acted on by brokers
> being notified of the change in the ZK path, just as currently. This would
> have kept the ReassignPartitionsCommand relatively simple, as it currently
> is.
>

I am not sure I fully understand your proposal. I think you are saying that
any broker can receive and handle the AlterTopicRequest. Let's say a
non-controller broker received AlterTopicRequest, is this broker going to
send LeaderAndIsrRequest to other brokers? Or is this broker create the
reassignment znode in zookeper? I may have missed it. But I couldn't find
the explanation of AlterTopicRequest handling in KIP-179.



>
> KIP-113 is obviously seeking to make more radical changes. The algorithm
> described for moving a replica to a particular directory on a different
> broker (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 113%3A+Support+replicas+movement+between+log+directories#KIP-113:
> Supportreplicasmovementbetweenlogdirectories-2)
> Howtoreassignreplicabetweenlogdirectoriesacrossbrokers
>  113%3A+Support+replicas+movement+between+log+directories#KIP-113:
> Supportreplicasmovementbetweenlogdirectories-2%
> 29Howtoreassignreplicabetweenlogdirectoriesacrossbrokers>)
> involves both sending AlterReplicaDirRequest to "the" broker (the receiving
> broker, I assume, but it's not spelled out), _as well as_ writing to the ZK
> node.
>
> This assumes the script (ReassignPartitionsCommand) has direct access to
> ZooKeeper, which is what KIP-179 is seeking to deprecate. It seems a waste
> of time to put the logic in the script as part of KIP-113, only for KIP-179
> to have to move it back to the controller.
>

I am not sure I understand what you mean by "It seems a waste of time to
put the logic in the script as part of KIP-113, only for KIP-179 to have to
move it back to the controller". I assume that the logic you mentioned is
"movement of replica to the specified log directory". This logic (or the
implementation of this logic) resides mainly in the KafkaAdminClient and
broker. The script only needs to parse the json file as appropriate and
call the new API in AdminClient as appropriate. The logic in the script is
therefore not much and can be easily moved to other classes if needed.

Can you clarify why this logic, i.e. movement of replica to the specified
log directory, needs to be moved to controller in KIP-179? I think it can
still be done in the script and controller should not need to worry about
log directory of any replica.

Thanks,
Dong


Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-08 Thread Tom Bentley
Hi Dong,

Thanks for your reply.

Yeah I agree with you that the total disk capacity can be useful
> particularly if it is different across brokers but it is probably of
> limited use in most cases. I also expect that most users would have their
> own customized tool across to determine the new partition reassignment
> after retrieving the partition distribution using DescribeDirsRequest.


By not providing a tool, you're just forcing people to write their own. So
your expectation will be self-fulfilling. Surely it would be better if the
project provided a tool (perhaps one which did the boring bits and gave
people the option to provide their own customized algorithm).


> And
> that customized tool can probably be easily provided with the configuration
> (e.g. disk capacity, IO parameters) of the disks in the cluster when user
> runs it.
>

Sure, but it would be better if a tool could discover this for itself. At
best you're forcing people into getting the information out-of-band (e.g.
via JMX), but worse would be if they end up using static data that doesn't
change as their cluster evolves over time.


> I am relatively neural on whether or not we should add this field. If there
> is no strong reason to add this field, I will add it if one or more
> committer recommends to do this.
>

I don't think we should add it to KIP-113: It could be added at a later
date easily enough.

Cheers,

Tom


Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-08 Thread Tom Bentley
> >
> > Also, how do you think things would work in the context of KIP-179? Would
> > the tool still invoke these requests or would it be done by the broker
> > receiving the alterTopics/reassignPartitions protocol call?
> >
>
> My gut feel is that the tool will still invoke these requests. But I have a
> few questions to KIP-179 before I can answer this question. For example, is
> AlterTopicsRequest request sent to controller only? If the new assignment
> is not written in zookeeper, how is this information propagated to the new
> controller if the previous controller dies after it receives
> AlterTopicsRequest but before it sends LeaderAndIsrRequest? I can post
> these questions in that discussion thread later.
>
>
Let me answer here (though it's relevant to both KIPs):

As I originally envisaged it, KIP-179's support for reassigning partitions
would have more-or-less taken the logic currently in the
ReassignPartitionsCommand (that is, writing JSON to the
ZkUtils.ReassignPartitionsPath)
and put it behind a suitable network protocol API. Thus it wouldn't matter
which broker received the protocol call: It would be acted on by brokers
being notified of the change in the ZK path, just as currently. This would
have kept the ReassignPartitionsCommand relatively simple, as it currently
is.

KIP-113 is obviously seeking to make more radical changes. The algorithm
described for moving a replica to a particular directory on a different
broker (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%3A+Support+replicas+movement+between+log+directories#KIP-113:Supportreplicasmovementbetweenlogdirectories-2)Howtoreassignreplicabetweenlogdirectoriesacrossbrokers
)
involves both sending AlterReplicaDirRequest to "the" broker (the receiving
broker, I assume, but it's not spelled out), _as well as_ writing to the ZK
node.

This assumes the script (ReassignPartitionsCommand) has direct access to
ZooKeeper, which is what KIP-179 is seeking to deprecate. It seems a waste
of time to put the logic in the script as part of KIP-113, only for KIP-179
to have to move it back to the controller.


Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-07 Thread Dong Lin
Hey Tom,

Yeah I agree with you that the total disk capacity can be useful
particularly if it is different across brokers but it is probably of
limited use in most cases. I also expect that most users would have their
own customized tool across to determine the new partition reassignment
after retrieving the partition distribution using DescribeDirsRequest. And
that customized tool can probably be easily provided with the configuration
(e.g. disk capacity, IO parameters) of the disks in the cluster when user
runs it.

I am relatively neural on whether or not we should add this field. If there
is no strong reason to add this field, I will add it if one or more
committer recommends to do this.

Thanks,
Dong



On Mon, Aug 7, 2017 at 1:01 PM, Tom Bentley  wrote:

> Hi Dong,
>
> The reason I thought this would be useful is it seems likely to me that
> people will want to write tools to help them generate allocations. If, as
> you say, all the brokers and all the disks are the same size, then it's not
> too difficult to tell the tool the size of the disk. But if they're not the
> same, then using the tool becomes a lot harder. Obviously if the size of
> the disk is included in the DescribeDirsResponse then you can literally
> just point the tool at the cluster.
>
> On the other hand, it seems likely that tools might also want to take into
> account other things when trying to find a good assignment (per-device IO
> for example) between the disks on a broker, so maybe including the total
> disk capacity is only of limited use.
>
> Cheers,
>
> Tom
>
> On 7 August 2017 at 17:54, Dong Lin  wrote:
>
> > Hey Tom,
> >
> > Good question. We have actually considered having DescribeDirsResponse
> > provide the capacity of each disk as well. This was not included because
> we
> > believe Kafka cluster admin will always configure all brokers with same
> > number of disks of the same size. This is because it is generally easier
> to
> > manager a homogeneous cluster. If this is not the case then I think we
> > should include this information in the response.
> >
> > Thanks,
> > Dong
> >
> >
> > On Mon, Aug 7, 2017 at 3:44 AM, Tom Bentley 
> wrote:
> >
> > > Hi Dong,
> > >
> > > Your comments on KIP-179 prompted me to look at KIP-113, and I have a
> > > question:
> > >
> > > AFAICS the DescribeDirsResponse (via ReplicaInfo) can be used to get
> the
> > > size of a partition on a disk, but I don't see a mechanism for knowing
> > the
> > > total capacity of a disk (and/or the free capacity of a disk). That
> would
> > > be very useful information to have to help figure out that certain
> > > assignments are impossible, for instance. Is there a reason you've left
> > > this out?
> > >
> > > Cheers,
> > >
> > > Tom
> > >
> > > On 4 August 2017 at 18:47, Dong Lin  wrote:
> > >
> > > > Hey Ismael,
> > > >
> > > > Thanks for the comments! Here are my answers:
> > > >
> > > > 1. Yes it has been considered. Here are the reasons why we don't do
> it
> > > > through controller.
> > > >
> > > > - There can be use-cases where we only want to rebalance the load of
> > log
> > > > directories on a given broker. It seems unnecessary to go through
> > > > controller in this case.
> > > >
> > > >  - If controller is responsible for sending ChangeReplicaDirRequest,
> > and
> > > if
> > > > the user-specified log directory is either invalid or offline, then
> > > > controller probably needs a way to tell user that the partition
> > > > reassignment has failed. We currently don't have a way to do this
> since
> > > > kafka-reassign-partition.sh simply creates the reassignment znode
> > without
> > > > waiting for response. I am not sure that is a good solution to this.
> > > >
> > > > - If controller is responsible for sending ChangeReplicaDirRequest,
> the
> > > > controller logic would be more complicated because controller needs
> to
> > > > first send ChangeReplicaRequest so that the broker memorize the
> > partition
> > > > -> log directory mapping, send LeaderAndIsrRequest, and keep sending
> > > > ChangeReplicaDirRequest (just in case broker restarted) until replica
> > is
> > > > created. Note that the last step needs repeat and timeout as the
> > proposed
> > > > in the KIP-113.
> > > >
> > > > Overall I think this adds quite a bit complexity to controller and we
> > > > probably want to do this only if there is strong clear of doing so.
> > > > Currently in KIP-113 the kafka-reassign-partitions.sh is responsible
> > for
> > > > sending ChangeReplicaDirRequest with repeat and provides error to
> user
> > if
> > > > it either fails or timeout. It seems to be much simpler and user
> > > shouldn't
> > > > care whether it is done through controller.
> > > >
> > > > And thanks for the suggestion. I will add this to the Rejected
> > > Alternative
> > > > Section in the KIP-113.
> > > >
> > > > 2) I think user needs to be able to specify different log 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-07 Thread Tom Bentley
Hi Dong,

The reason I thought this would be useful is it seems likely to me that
people will want to write tools to help them generate allocations. If, as
you say, all the brokers and all the disks are the same size, then it's not
too difficult to tell the tool the size of the disk. But if they're not the
same, then using the tool becomes a lot harder. Obviously if the size of
the disk is included in the DescribeDirsResponse then you can literally
just point the tool at the cluster.

On the other hand, it seems likely that tools might also want to take into
account other things when trying to find a good assignment (per-device IO
for example) between the disks on a broker, so maybe including the total
disk capacity is only of limited use.

Cheers,

Tom

On 7 August 2017 at 17:54, Dong Lin  wrote:

> Hey Tom,
>
> Good question. We have actually considered having DescribeDirsResponse
> provide the capacity of each disk as well. This was not included because we
> believe Kafka cluster admin will always configure all brokers with same
> number of disks of the same size. This is because it is generally easier to
> manager a homogeneous cluster. If this is not the case then I think we
> should include this information in the response.
>
> Thanks,
> Dong
>
>
> On Mon, Aug 7, 2017 at 3:44 AM, Tom Bentley  wrote:
>
> > Hi Dong,
> >
> > Your comments on KIP-179 prompted me to look at KIP-113, and I have a
> > question:
> >
> > AFAICS the DescribeDirsResponse (via ReplicaInfo) can be used to get the
> > size of a partition on a disk, but I don't see a mechanism for knowing
> the
> > total capacity of a disk (and/or the free capacity of a disk). That would
> > be very useful information to have to help figure out that certain
> > assignments are impossible, for instance. Is there a reason you've left
> > this out?
> >
> > Cheers,
> >
> > Tom
> >
> > On 4 August 2017 at 18:47, Dong Lin  wrote:
> >
> > > Hey Ismael,
> > >
> > > Thanks for the comments! Here are my answers:
> > >
> > > 1. Yes it has been considered. Here are the reasons why we don't do it
> > > through controller.
> > >
> > > - There can be use-cases where we only want to rebalance the load of
> log
> > > directories on a given broker. It seems unnecessary to go through
> > > controller in this case.
> > >
> > >  - If controller is responsible for sending ChangeReplicaDirRequest,
> and
> > if
> > > the user-specified log directory is either invalid or offline, then
> > > controller probably needs a way to tell user that the partition
> > > reassignment has failed. We currently don't have a way to do this since
> > > kafka-reassign-partition.sh simply creates the reassignment znode
> without
> > > waiting for response. I am not sure that is a good solution to this.
> > >
> > > - If controller is responsible for sending ChangeReplicaDirRequest, the
> > > controller logic would be more complicated because controller needs to
> > > first send ChangeReplicaRequest so that the broker memorize the
> partition
> > > -> log directory mapping, send LeaderAndIsrRequest, and keep sending
> > > ChangeReplicaDirRequest (just in case broker restarted) until replica
> is
> > > created. Note that the last step needs repeat and timeout as the
> proposed
> > > in the KIP-113.
> > >
> > > Overall I think this adds quite a bit complexity to controller and we
> > > probably want to do this only if there is strong clear of doing so.
> > > Currently in KIP-113 the kafka-reassign-partitions.sh is responsible
> for
> > > sending ChangeReplicaDirRequest with repeat and provides error to user
> if
> > > it either fails or timeout. It seems to be much simpler and user
> > shouldn't
> > > care whether it is done through controller.
> > >
> > > And thanks for the suggestion. I will add this to the Rejected
> > Alternative
> > > Section in the KIP-113.
> > >
> > > 2) I think user needs to be able to specify different log directories
> for
> > > the replicas of the same partition in order to rebalance load across
> log
> > > directories of all brokers. I am not sure I understand the question.
> Can
> > > you explain a bit more why "that the log directory has to be the same
> for
> > > all replicas of a given partition"?
> > >
> > > 3) Good point. I think the alterReplicaDir is a better than
> > > changeReplicaDir for the reason you provided. I will also update names
> of
> > > the request/response as well in the KIP.
> > >
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Fri, Aug 4, 2017 at 9:49 AM, Ismael Juma  wrote:
> > >
> > > > Thanks Dong. I have a few initial questions, sorry if I it has been
> > > > discussed and I missed it.
> > > >
> > > > 1. The KIP suggests that the reassignment tool is responsible for
> > sending
> > > > the ChangeReplicaDirRequests to the relevant brokers. I had imagined
> > that
> > > > this would be done by the Controller, like the rest of the
> reassignment
> > > > 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-07 Thread Dong Lin
Hey Ismael,

Thanks much for your comments. Please see my reply inline.

On Mon, Aug 7, 2017 at 5:28 AM, Ismael Juma  wrote:

> Hi Dong,
>
> Thanks for the explanation. Comments inline.
>
> On Fri, Aug 4, 2017 at 6:47 PM, Dong Lin  wrote:
>
> > 1. Yes it has been considered. Here are the reasons why we don't do it
> > through controller.
> >
> > - There can be use-cases where we only want to rebalance the load of log
> > directories on a given broker. It seems unnecessary to go through
> > controller in this case.
> >
>
> Even though this is true, not sure how common it will be.
>

I think the frequency of the need to balance across disks on the same
broker will be considerably higher (e.g. 2X or more) than the frequency
needed to balance across brokers. This is because the underlying replica
has the same size distribution but the capacity of broker can be 10X as
much as the capacity of disk.

I don't think this is a strong argument for having this logic only in the
tool instead of controller. It is a nice to have feature if there is no
strong reason to do it in controller.


>
>  - If controller is responsible for sending ChangeReplicaDirRequest, and if
> > the user-specified log directory is either invalid or offline, then
> > controller probably needs a way to tell user that the partition
> > reassignment has failed. We currently don't have a way to do this since
> > kafka-reassign-partition.sh simply creates the reassignment znode without
> > waiting for response. I am not sure that is a good solution to this.
> >
>
> Since the JSON is provided by the user, we would ideally validate its
> contents before storing it. Why are the log directories different than the
> other information in the JSON?


I think there are two difference between the log directory field and other
fields in the JSON:

- The log directory field take much more bytes than other fields in the
reassignment znode. Due to the 1MB size limit of znode, Kafka admin
currently have to split a large reassignment into multiple smaller
reassignment which limits the number of partitions that can be moved
concurrently. Currently the reassignment znode has 1 integer for each
replica. The log directory will introduce probably 10+ characters for each
replica. This can significantly lower the number of partitions that can be
reassigned at the same time.

- All other fields in the reassignment znode can be found and verified by
the other znodes in the zookeeper. Thus controller only needs to register a
ZK listener to be notified once the reassignment completes. However, the
log directory of each replica is not in the zookeeper. The controller would
have to periodically sending DescribeDirsRequet to check whether the
replica has been successfully moved to the destination log directory.
Currently there is nothing like this in the controller logic. Ideally we
want to avoid adding this complexity and performance overhead in controller.



> - If controller is responsible for sending ChangeReplicaDirRequest, the
> > controller logic would be more complicated because controller needs to
> > first send ChangeReplicaRequest so that the broker memorize the partition
> > -> log directory mapping, send LeaderAndIsrRequest, and keep sending
> > ChangeReplicaDirRequest (just in case broker restarted) until replica is
> > created. Note that the last step needs repeat and timeout as the proposed
> > in the KIP-113.
> >
> > Overall I think this adds quite a bit complexity to controller and we
> > probably want to do this only if there is strong clear of doing so.
> > Currently in KIP-113 the kafka-reassign-partitions.sh is responsible for
> > sending ChangeReplicaDirRequest with repeat and provides error to user if
> > it either fails or timeout. It seems to be much simpler and user
> shouldn't
> > care whether it is done through controller.
> >
>
> If I understand correctly, the logic is the same in both cases, it's just a
> question of where it lives. The advantage of placing it in the Controller
> is that the whole reassignment logic is in one place (instead of split
> between the tool and the Controller). That seems easier to reason about.
>

It seems that the main motivation for putting this logic in controller is
to simplify the work for Kafka developer. I agree it is desirable to put
the logic in the same place. On the other hand we developer also want to
keep controller simple and efficient.

I actually did this in the original design but later I was convinced by Jun
that it is simpler to put the new logic in the reassignment tool. I think
we can put this logic in controller if we can find good solution to the
problems described above.


>
> Also, how do you think things would work in the context of KIP-179? Would
> the tool still invoke these requests or would it be done by the broker
> receiving the alterTopics/reassignPartitions protocol call?
>

My gut feel is that the tool will still invoke these requests. But I 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-07 Thread Dong Lin
Hey Tom,

Good question. We have actually considered having DescribeDirsResponse
provide the capacity of each disk as well. This was not included because we
believe Kafka cluster admin will always configure all brokers with same
number of disks of the same size. This is because it is generally easier to
manager a homogeneous cluster. If this is not the case then I think we
should include this information in the response.

Thanks,
Dong


On Mon, Aug 7, 2017 at 3:44 AM, Tom Bentley  wrote:

> Hi Dong,
>
> Your comments on KIP-179 prompted me to look at KIP-113, and I have a
> question:
>
> AFAICS the DescribeDirsResponse (via ReplicaInfo) can be used to get the
> size of a partition on a disk, but I don't see a mechanism for knowing the
> total capacity of a disk (and/or the free capacity of a disk). That would
> be very useful information to have to help figure out that certain
> assignments are impossible, for instance. Is there a reason you've left
> this out?
>
> Cheers,
>
> Tom
>
> On 4 August 2017 at 18:47, Dong Lin  wrote:
>
> > Hey Ismael,
> >
> > Thanks for the comments! Here are my answers:
> >
> > 1. Yes it has been considered. Here are the reasons why we don't do it
> > through controller.
> >
> > - There can be use-cases where we only want to rebalance the load of log
> > directories on a given broker. It seems unnecessary to go through
> > controller in this case.
> >
> >  - If controller is responsible for sending ChangeReplicaDirRequest, and
> if
> > the user-specified log directory is either invalid or offline, then
> > controller probably needs a way to tell user that the partition
> > reassignment has failed. We currently don't have a way to do this since
> > kafka-reassign-partition.sh simply creates the reassignment znode without
> > waiting for response. I am not sure that is a good solution to this.
> >
> > - If controller is responsible for sending ChangeReplicaDirRequest, the
> > controller logic would be more complicated because controller needs to
> > first send ChangeReplicaRequest so that the broker memorize the partition
> > -> log directory mapping, send LeaderAndIsrRequest, and keep sending
> > ChangeReplicaDirRequest (just in case broker restarted) until replica is
> > created. Note that the last step needs repeat and timeout as the proposed
> > in the KIP-113.
> >
> > Overall I think this adds quite a bit complexity to controller and we
> > probably want to do this only if there is strong clear of doing so.
> > Currently in KIP-113 the kafka-reassign-partitions.sh is responsible for
> > sending ChangeReplicaDirRequest with repeat and provides error to user if
> > it either fails or timeout. It seems to be much simpler and user
> shouldn't
> > care whether it is done through controller.
> >
> > And thanks for the suggestion. I will add this to the Rejected
> Alternative
> > Section in the KIP-113.
> >
> > 2) I think user needs to be able to specify different log directories for
> > the replicas of the same partition in order to rebalance load across log
> > directories of all brokers. I am not sure I understand the question. Can
> > you explain a bit more why "that the log directory has to be the same for
> > all replicas of a given partition"?
> >
> > 3) Good point. I think the alterReplicaDir is a better than
> > changeReplicaDir for the reason you provided. I will also update names of
> > the request/response as well in the KIP.
> >
> >
> > Thanks,
> > Dong
> >
> > On Fri, Aug 4, 2017 at 9:49 AM, Ismael Juma  wrote:
> >
> > > Thanks Dong. I have a few initial questions, sorry if I it has been
> > > discussed and I missed it.
> > >
> > > 1. The KIP suggests that the reassignment tool is responsible for
> sending
> > > the ChangeReplicaDirRequests to the relevant brokers. I had imagined
> that
> > > this would be done by the Controller, like the rest of the reassignment
> > > process. Was this considered? If so, it would be good to include the
> > > details of why it was rejected in the "Rejected Alternatives" section.
> > >
> > > 2. The reassignment JSON format was extended so that one can choose the
> > log
> > > directory for a partition. This means that the log directory has to be
> > the
> > > same for all replicas of a given partition. The alternative would be
> for
> > > the log dir to be assignable for each replica. Similar to the other
> > > question, it would be good to have a section in "Rejected Alternatives"
> > for
> > > this approach. It's generally very helpful to have more information on
> > the
> > > rationale for the design choices that were made and rejected.
> > >
> > > 3. Should changeReplicaDir be alterReplicaDir? We have used `alter` for
> > > other methods.
> > >
> > > Thanks,
> > > Ismael
> > >
> > >
> > >
> > >
> > > On Fri, Aug 4, 2017 at 5:37 AM, Dong Lin  wrote:
> > >
> > > > Hi all,
> > > >
> > > > I realized that we need new API in AdminClient in order to use 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-07 Thread Ismael Juma
Hi Dong,

Thanks for the explanation. Comments inline.

On Fri, Aug 4, 2017 at 6:47 PM, Dong Lin  wrote:

> 1. Yes it has been considered. Here are the reasons why we don't do it
> through controller.
>
> - There can be use-cases where we only want to rebalance the load of log
> directories on a given broker. It seems unnecessary to go through
> controller in this case.
>

Even though this is true, not sure how common it will be.

 - If controller is responsible for sending ChangeReplicaDirRequest, and if
> the user-specified log directory is either invalid or offline, then
> controller probably needs a way to tell user that the partition
> reassignment has failed. We currently don't have a way to do this since
> kafka-reassign-partition.sh simply creates the reassignment znode without
> waiting for response. I am not sure that is a good solution to this.
>

Since the JSON is provided by the user, we would ideally validate its
contents before storing it. Why are the log directories different than the
other information in the JSON?

- If controller is responsible for sending ChangeReplicaDirRequest, the
> controller logic would be more complicated because controller needs to
> first send ChangeReplicaRequest so that the broker memorize the partition
> -> log directory mapping, send LeaderAndIsrRequest, and keep sending
> ChangeReplicaDirRequest (just in case broker restarted) until replica is
> created. Note that the last step needs repeat and timeout as the proposed
> in the KIP-113.
>
> Overall I think this adds quite a bit complexity to controller and we
> probably want to do this only if there is strong clear of doing so.
> Currently in KIP-113 the kafka-reassign-partitions.sh is responsible for
> sending ChangeReplicaDirRequest with repeat and provides error to user if
> it either fails or timeout. It seems to be much simpler and user shouldn't
> care whether it is done through controller.
>

If I understand correctly, the logic is the same in both cases, it's just a
question of where it lives. The advantage of placing it in the Controller
is that the whole reassignment logic is in one place (instead of split
between the tool and the Controller). That seems easier to reason about.

Also, how do you think things would work in the context of KIP-179? Would
the tool still invoke these requests or would it be done by the broker
receiving the alterTopics/reassignPartitions protocol call?

And thanks for the suggestion. I will add this to the Rejected Alternative
> Section in the KIP-113.
>
> 2) I think user needs to be able to specify different log directories for
> the replicas of the same partition in order to rebalance load across log
> directories of all brokers. I am not sure I understand the question. Can
> you explain a bit more why "that the log directory has to be the same for
> all replicas of a given partition"?


I think I misunderstood the schema. The KIP has the following example:

"partitions" : [
{
  "topic" : str,
  "partition" : int,
  "replicas" : [int],
  "log_dirs" : [str]<-- NEW. A log directory can be either "any",
or a valid absolute path that begins with '/'. This is an optional filed.
It is treated as an array of "any" if this field is not explicitly
specified in the json file.
},
...
  ]

Is it right that `log_dirs` is an array in the same order as `replicas`?
That's a bit obscure and we should document it more clearly. Did we discard
the option of a more readable schema (i.e. a JSON object mapping a replica
id to a log dir) due to efficiency concerns? It would be good to include
that in the KIP.

Thanks,
Ismael


Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-07 Thread Tom Bentley
Hi Dong,

Your comments on KIP-179 prompted me to look at KIP-113, and I have a
question:

AFAICS the DescribeDirsResponse (via ReplicaInfo) can be used to get the
size of a partition on a disk, but I don't see a mechanism for knowing the
total capacity of a disk (and/or the free capacity of a disk). That would
be very useful information to have to help figure out that certain
assignments are impossible, for instance. Is there a reason you've left
this out?

Cheers,

Tom

On 4 August 2017 at 18:47, Dong Lin  wrote:

> Hey Ismael,
>
> Thanks for the comments! Here are my answers:
>
> 1. Yes it has been considered. Here are the reasons why we don't do it
> through controller.
>
> - There can be use-cases where we only want to rebalance the load of log
> directories on a given broker. It seems unnecessary to go through
> controller in this case.
>
>  - If controller is responsible for sending ChangeReplicaDirRequest, and if
> the user-specified log directory is either invalid or offline, then
> controller probably needs a way to tell user that the partition
> reassignment has failed. We currently don't have a way to do this since
> kafka-reassign-partition.sh simply creates the reassignment znode without
> waiting for response. I am not sure that is a good solution to this.
>
> - If controller is responsible for sending ChangeReplicaDirRequest, the
> controller logic would be more complicated because controller needs to
> first send ChangeReplicaRequest so that the broker memorize the partition
> -> log directory mapping, send LeaderAndIsrRequest, and keep sending
> ChangeReplicaDirRequest (just in case broker restarted) until replica is
> created. Note that the last step needs repeat and timeout as the proposed
> in the KIP-113.
>
> Overall I think this adds quite a bit complexity to controller and we
> probably want to do this only if there is strong clear of doing so.
> Currently in KIP-113 the kafka-reassign-partitions.sh is responsible for
> sending ChangeReplicaDirRequest with repeat and provides error to user if
> it either fails or timeout. It seems to be much simpler and user shouldn't
> care whether it is done through controller.
>
> And thanks for the suggestion. I will add this to the Rejected Alternative
> Section in the KIP-113.
>
> 2) I think user needs to be able to specify different log directories for
> the replicas of the same partition in order to rebalance load across log
> directories of all brokers. I am not sure I understand the question. Can
> you explain a bit more why "that the log directory has to be the same for
> all replicas of a given partition"?
>
> 3) Good point. I think the alterReplicaDir is a better than
> changeReplicaDir for the reason you provided. I will also update names of
> the request/response as well in the KIP.
>
>
> Thanks,
> Dong
>
> On Fri, Aug 4, 2017 at 9:49 AM, Ismael Juma  wrote:
>
> > Thanks Dong. I have a few initial questions, sorry if I it has been
> > discussed and I missed it.
> >
> > 1. The KIP suggests that the reassignment tool is responsible for sending
> > the ChangeReplicaDirRequests to the relevant brokers. I had imagined that
> > this would be done by the Controller, like the rest of the reassignment
> > process. Was this considered? If so, it would be good to include the
> > details of why it was rejected in the "Rejected Alternatives" section.
> >
> > 2. The reassignment JSON format was extended so that one can choose the
> log
> > directory for a partition. This means that the log directory has to be
> the
> > same for all replicas of a given partition. The alternative would be for
> > the log dir to be assignable for each replica. Similar to the other
> > question, it would be good to have a section in "Rejected Alternatives"
> for
> > this approach. It's generally very helpful to have more information on
> the
> > rationale for the design choices that were made and rejected.
> >
> > 3. Should changeReplicaDir be alterReplicaDir? We have used `alter` for
> > other methods.
> >
> > Thanks,
> > Ismael
> >
> >
> >
> >
> > On Fri, Aug 4, 2017 at 5:37 AM, Dong Lin  wrote:
> >
> > > Hi all,
> > >
> > > I realized that we need new API in AdminClient in order to use the new
> > > request/response added in KIP-113. Since this is required by KIP-113, I
> > > choose to add the new interface in this KIP instead of creating a new
> > KIP.
> > >
> > > The documentation of the new API in AdminClient can be found here
> > >  > > 113%3A+Support+replicas+movement+between+log+directories#KIP-113:
> > > Supportreplicasmovementbetweenlogdirectories-AdminClient>.
> > > Can you please review and comment if you have any concern?
> > >
> > > Thanks!
> > > Dong
> > >
> > >
> > >
> > > On Wed, Jul 12, 2017 at 10:44 AM, Dong Lin 
> wrote:
> > >
> > > > The protocol change has been updated in KIP-113
> > > > 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-04 Thread Dong Lin
Hey Ismael,

Thanks for the comments! Here are my answers:

1. Yes it has been considered. Here are the reasons why we don't do it
through controller.

- There can be use-cases where we only want to rebalance the load of log
directories on a given broker. It seems unnecessary to go through
controller in this case.

 - If controller is responsible for sending ChangeReplicaDirRequest, and if
the user-specified log directory is either invalid or offline, then
controller probably needs a way to tell user that the partition
reassignment has failed. We currently don't have a way to do this since
kafka-reassign-partition.sh simply creates the reassignment znode without
waiting for response. I am not sure that is a good solution to this.

- If controller is responsible for sending ChangeReplicaDirRequest, the
controller logic would be more complicated because controller needs to
first send ChangeReplicaRequest so that the broker memorize the partition
-> log directory mapping, send LeaderAndIsrRequest, and keep sending
ChangeReplicaDirRequest (just in case broker restarted) until replica is
created. Note that the last step needs repeat and timeout as the proposed
in the KIP-113.

Overall I think this adds quite a bit complexity to controller and we
probably want to do this only if there is strong clear of doing so.
Currently in KIP-113 the kafka-reassign-partitions.sh is responsible for
sending ChangeReplicaDirRequest with repeat and provides error to user if
it either fails or timeout. It seems to be much simpler and user shouldn't
care whether it is done through controller.

And thanks for the suggestion. I will add this to the Rejected Alternative
Section in the KIP-113.

2) I think user needs to be able to specify different log directories for
the replicas of the same partition in order to rebalance load across log
directories of all brokers. I am not sure I understand the question. Can
you explain a bit more why "that the log directory has to be the same for
all replicas of a given partition"?

3) Good point. I think the alterReplicaDir is a better than
changeReplicaDir for the reason you provided. I will also update names of
the request/response as well in the KIP.


Thanks,
Dong

On Fri, Aug 4, 2017 at 9:49 AM, Ismael Juma  wrote:

> Thanks Dong. I have a few initial questions, sorry if I it has been
> discussed and I missed it.
>
> 1. The KIP suggests that the reassignment tool is responsible for sending
> the ChangeReplicaDirRequests to the relevant brokers. I had imagined that
> this would be done by the Controller, like the rest of the reassignment
> process. Was this considered? If so, it would be good to include the
> details of why it was rejected in the "Rejected Alternatives" section.
>
> 2. The reassignment JSON format was extended so that one can choose the log
> directory for a partition. This means that the log directory has to be the
> same for all replicas of a given partition. The alternative would be for
> the log dir to be assignable for each replica. Similar to the other
> question, it would be good to have a section in "Rejected Alternatives" for
> this approach. It's generally very helpful to have more information on the
> rationale for the design choices that were made and rejected.
>
> 3. Should changeReplicaDir be alterReplicaDir? We have used `alter` for
> other methods.
>
> Thanks,
> Ismael
>
>
>
>
> On Fri, Aug 4, 2017 at 5:37 AM, Dong Lin  wrote:
>
> > Hi all,
> >
> > I realized that we need new API in AdminClient in order to use the new
> > request/response added in KIP-113. Since this is required by KIP-113, I
> > choose to add the new interface in this KIP instead of creating a new
> KIP.
> >
> > The documentation of the new API in AdminClient can be found here
> >  > 113%3A+Support+replicas+movement+between+log+directories#KIP-113:
> > Supportreplicasmovementbetweenlogdirectories-AdminClient>.
> > Can you please review and comment if you have any concern?
> >
> > Thanks!
> > Dong
> >
> >
> >
> > On Wed, Jul 12, 2017 at 10:44 AM, Dong Lin  wrote:
> >
> > > The protocol change has been updated in KIP-113
> > >  > 113%3A+Support+replicas+movement+between+log+directories>
> > > .
> > >
> > > On Wed, Jul 12, 2017 at 10:44 AM, Dong Lin 
> wrote:
> > >
> > >> Hi all,
> > >>
> > >> I have made a minor change to the DescribeDirsRequest so that user can
> > >> choose to query the status for a specific list of partitions. This is
> a
> > bit
> > >> more fine-granular than the previous format that allows user to query
> > the
> > >> status for a specific list of topics. I realized that querying the
> > status
> > >> of selected partitions can be useful to check the whether the
> > reassignment
> > >> of the replicas to the specific log directories has been completed.
> > >>
> > >> I will assume 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-04 Thread Ismael Juma
Thanks Dong. I have a few initial questions, sorry if I it has been
discussed and I missed it.

1. The KIP suggests that the reassignment tool is responsible for sending
the ChangeReplicaDirRequests to the relevant brokers. I had imagined that
this would be done by the Controller, like the rest of the reassignment
process. Was this considered? If so, it would be good to include the
details of why it was rejected in the "Rejected Alternatives" section.

2. The reassignment JSON format was extended so that one can choose the log
directory for a partition. This means that the log directory has to be the
same for all replicas of a given partition. The alternative would be for
the log dir to be assignable for each replica. Similar to the other
question, it would be good to have a section in "Rejected Alternatives" for
this approach. It's generally very helpful to have more information on the
rationale for the design choices that were made and rejected.

3. Should changeReplicaDir be alterReplicaDir? We have used `alter` for
other methods.

Thanks,
Ismael




On Fri, Aug 4, 2017 at 5:37 AM, Dong Lin  wrote:

> Hi all,
>
> I realized that we need new API in AdminClient in order to use the new
> request/response added in KIP-113. Since this is required by KIP-113, I
> choose to add the new interface in this KIP instead of creating a new KIP.
>
> The documentation of the new API in AdminClient can be found here
>  113%3A+Support+replicas+movement+between+log+directories#KIP-113:
> Supportreplicasmovementbetweenlogdirectories-AdminClient>.
> Can you please review and comment if you have any concern?
>
> Thanks!
> Dong
>
>
>
> On Wed, Jul 12, 2017 at 10:44 AM, Dong Lin  wrote:
>
> > The protocol change has been updated in KIP-113
> >  113%3A+Support+replicas+movement+between+log+directories>
> > .
> >
> > On Wed, Jul 12, 2017 at 10:44 AM, Dong Lin  wrote:
> >
> >> Hi all,
> >>
> >> I have made a minor change to the DescribeDirsRequest so that user can
> >> choose to query the status for a specific list of partitions. This is a
> bit
> >> more fine-granular than the previous format that allows user to query
> the
> >> status for a specific list of topics. I realized that querying the
> status
> >> of selected partitions can be useful to check the whether the
> reassignment
> >> of the replicas to the specific log directories has been completed.
> >>
> >> I will assume this minor change is OK if there is no concern with it in
> >> the community :)
> >>
> >> Thanks,
> >> Dong
> >>
> >>
> >>
> >> On Mon, Jun 12, 2017 at 10:46 AM, Dong Lin  wrote:
> >>
> >>> Hey Colin,
> >>>
> >>> Thanks for the suggestion. We have actually considered this and list
> >>> this as the first future work in KIP-112
> >>>  112%3A+Handle+disk+failure+for+JBOD>.
> >>> The two advantages that you mentioned are exactly the motivation for
> this
> >>> feature. Also as you have mentioned, this involves the tradeoff between
> >>> disk performance and availability -- the more you distribute topic
> across
> >>> disks, the more topics will be offline due to a single disk failure.
> >>>
> >>> Despite its complexity, it is not clear to me that the reduced
> rebalance
> >>> overhead is worth the reduction in availability. I am optimistic that
> the
> >>> rebalance overhead will not be that a big problem since we are not too
> >>> bothered by cross-broker rebalance as of now.
> >>>
> >>> Thanks,
> >>> Dong
> >>>
> >>> On Mon, Jun 12, 2017 at 10:36 AM, Colin McCabe 
> >>> wrote:
> >>>
>  Has anyone considered a scheme for sharding topic data across multiple
>  disks?
> 
>  For example, if you sharded topics across 3 disks, and you had 10
> disks,
>  you could pick a different set of 3 disks for each topic.  If you
>  distribute them randomly then you have 10 choose 3 = 120 different
>  combinations.  You would probably never need rebalancing if you had a
>  reasonable distribution of topic sizes (could probably prove this
> with a
>  Monte Carlo or something).
> 
>  The disadvantage is that if one of the 3 disks fails, then you have to
>  take the topic offline.  But if we assume independent disk failure
>  probabilities, probability of failure with RAID 0 is: 1 -
>  Psuccess^(num_disks) whereas the probability of failure with this
> scheme
>  is 1 - Psuccess ^ 3.
> 
>  This addresses the biggest downsides of JBOD now:
>  * limiting a topic to the size of a single disk limits scalability
>  * the topic movement process is tricky to get right and involves
> "racing
>  against producers" and wasted double I/Os
> 
>  Of course, one other question is how frequently we add new disk drives
>  to an 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-08-03 Thread Dong Lin
Hi all,

I realized that we need new API in AdminClient in order to use the new
request/response added in KIP-113. Since this is required by KIP-113, I
choose to add the new interface in this KIP instead of creating a new KIP.

The documentation of the new API in AdminClient can be found here
.
Can you please review and comment if you have any concern?

Thanks!
Dong



On Wed, Jul 12, 2017 at 10:44 AM, Dong Lin  wrote:

> The protocol change has been updated in KIP-113
> 
> .
>
> On Wed, Jul 12, 2017 at 10:44 AM, Dong Lin  wrote:
>
>> Hi all,
>>
>> I have made a minor change to the DescribeDirsRequest so that user can
>> choose to query the status for a specific list of partitions. This is a bit
>> more fine-granular than the previous format that allows user to query the
>> status for a specific list of topics. I realized that querying the status
>> of selected partitions can be useful to check the whether the reassignment
>> of the replicas to the specific log directories has been completed.
>>
>> I will assume this minor change is OK if there is no concern with it in
>> the community :)
>>
>> Thanks,
>> Dong
>>
>>
>>
>> On Mon, Jun 12, 2017 at 10:46 AM, Dong Lin  wrote:
>>
>>> Hey Colin,
>>>
>>> Thanks for the suggestion. We have actually considered this and list
>>> this as the first future work in KIP-112
>>> .
>>> The two advantages that you mentioned are exactly the motivation for this
>>> feature. Also as you have mentioned, this involves the tradeoff between
>>> disk performance and availability -- the more you distribute topic across
>>> disks, the more topics will be offline due to a single disk failure.
>>>
>>> Despite its complexity, it is not clear to me that the reduced rebalance
>>> overhead is worth the reduction in availability. I am optimistic that the
>>> rebalance overhead will not be that a big problem since we are not too
>>> bothered by cross-broker rebalance as of now.
>>>
>>> Thanks,
>>> Dong
>>>
>>> On Mon, Jun 12, 2017 at 10:36 AM, Colin McCabe 
>>> wrote:
>>>
 Has anyone considered a scheme for sharding topic data across multiple
 disks?

 For example, if you sharded topics across 3 disks, and you had 10 disks,
 you could pick a different set of 3 disks for each topic.  If you
 distribute them randomly then you have 10 choose 3 = 120 different
 combinations.  You would probably never need rebalancing if you had a
 reasonable distribution of topic sizes (could probably prove this with a
 Monte Carlo or something).

 The disadvantage is that if one of the 3 disks fails, then you have to
 take the topic offline.  But if we assume independent disk failure
 probabilities, probability of failure with RAID 0 is: 1 -
 Psuccess^(num_disks) whereas the probability of failure with this scheme
 is 1 - Psuccess ^ 3.

 This addresses the biggest downsides of JBOD now:
 * limiting a topic to the size of a single disk limits scalability
 * the topic movement process is tricky to get right and involves "racing
 against producers" and wasted double I/Os

 Of course, one other question is how frequently we add new disk drives
 to an existing broker.  In this case, you might reasonably want disk
 rebalancing to avoid overloading the new disk(s) with writes.

 cheers,
 Colin


 On Fri, Jun 9, 2017, at 18:46, Jun Rao wrote:
 > Just a few comments on this.
 >
 > 1. One of the issues with using RAID 0 is that a single disk failure
 > causes
 > a hard failure of the broker. Hard failure increases the
 unavailability
 > window for all the partitions on the failed broker, which includes the
 > failure detection time (tied to ZK session timeout right now) and
 leader
 > election time by the controller. If we support JBOD natively, when a
 > single
 > disk fails, only partitions on the failed disk will experience a hard
 > failure. The availability for partitions on the rest of the disks are
 not
 > affected.
 >
 > 2. For running things on the Cloud such as AWS. Currently, each EBS
 > volume
 > has a throughout limit of about 300MB/sec. If you get an enhanced EC2
 > instance, you can get 20Gb/sec network. To saturate the network, you
 may
 > need about 7 EBS volumes. So, being able to support JBOD in the Cloud
 is
 > still potentially useful.
 >
 > 3. On the benefit of balancing data across disks within the same
 broker.
 > 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-07-12 Thread Dong Lin
Hi all,

I have made a minor change to the DescribeDirsRequest so that user can
choose to query the status for a specific list of partitions. This is a bit
more fine-granular than the previous format that allows user to query the
status for a specific list of topics. I realized that querying the status
of selected partitions can be useful to check the whether the reassignment
of the replicas to the specific log directories has been completed.

I will assume this minor change is OK if there is no concern with it in the
community :)

Thanks,
Dong



On Mon, Jun 12, 2017 at 10:46 AM, Dong Lin  wrote:

> Hey Colin,
>
> Thanks for the suggestion. We have actually considered this and list this
> as the first future work in KIP-112
> .
> The two advantages that you mentioned are exactly the motivation for this
> feature. Also as you have mentioned, this involves the tradeoff between
> disk performance and availability -- the more you distribute topic across
> disks, the more topics will be offline due to a single disk failure.
>
> Despite its complexity, it is not clear to me that the reduced rebalance
> overhead is worth the reduction in availability. I am optimistic that the
> rebalance overhead will not be that a big problem since we are not too
> bothered by cross-broker rebalance as of now.
>
> Thanks,
> Dong
>
> On Mon, Jun 12, 2017 at 10:36 AM, Colin McCabe  wrote:
>
>> Has anyone considered a scheme for sharding topic data across multiple
>> disks?
>>
>> For example, if you sharded topics across 3 disks, and you had 10 disks,
>> you could pick a different set of 3 disks for each topic.  If you
>> distribute them randomly then you have 10 choose 3 = 120 different
>> combinations.  You would probably never need rebalancing if you had a
>> reasonable distribution of topic sizes (could probably prove this with a
>> Monte Carlo or something).
>>
>> The disadvantage is that if one of the 3 disks fails, then you have to
>> take the topic offline.  But if we assume independent disk failure
>> probabilities, probability of failure with RAID 0 is: 1 -
>> Psuccess^(num_disks) whereas the probability of failure with this scheme
>> is 1 - Psuccess ^ 3.
>>
>> This addresses the biggest downsides of JBOD now:
>> * limiting a topic to the size of a single disk limits scalability
>> * the topic movement process is tricky to get right and involves "racing
>> against producers" and wasted double I/Os
>>
>> Of course, one other question is how frequently we add new disk drives
>> to an existing broker.  In this case, you might reasonably want disk
>> rebalancing to avoid overloading the new disk(s) with writes.
>>
>> cheers,
>> Colin
>>
>>
>> On Fri, Jun 9, 2017, at 18:46, Jun Rao wrote:
>> > Just a few comments on this.
>> >
>> > 1. One of the issues with using RAID 0 is that a single disk failure
>> > causes
>> > a hard failure of the broker. Hard failure increases the unavailability
>> > window for all the partitions on the failed broker, which includes the
>> > failure detection time (tied to ZK session timeout right now) and leader
>> > election time by the controller. If we support JBOD natively, when a
>> > single
>> > disk fails, only partitions on the failed disk will experience a hard
>> > failure. The availability for partitions on the rest of the disks are
>> not
>> > affected.
>> >
>> > 2. For running things on the Cloud such as AWS. Currently, each EBS
>> > volume
>> > has a throughout limit of about 300MB/sec. If you get an enhanced EC2
>> > instance, you can get 20Gb/sec network. To saturate the network, you may
>> > need about 7 EBS volumes. So, being able to support JBOD in the Cloud is
>> > still potentially useful.
>> >
>> > 3. On the benefit of balancing data across disks within the same broker.
>> > Data imbalance can happen across brokers as well as across disks within
>> > the
>> > same broker. Balancing the data across disks within the broker has the
>> > benefit of saving network bandwidth as Dong mentioned. So, if intra
>> > broker
>> > load balancing is possible, it's probably better to avoid the more
>> > expensive inter broker load balancing. One of the reasons for disk
>> > imbalance right now is that partitions within a broker are assigned to
>> > disks just based on the partition count. So, it does seem possible for
>> > disks to get imbalanced from time to time. If someone can share some
>> > stats
>> > for that in practice, that will be very helpful.
>> >
>> > Thanks,
>> >
>> > Jun
>> >
>> >
>> > On Wed, Jun 7, 2017 at 2:30 PM, Dong Lin  wrote:
>> >
>> > > Hey Sriram,
>> > >
>> > > I think there is one way to explain why the ability to move replica
>> between
>> > > disks can save space. Let's say the load is distributed to disks
>> > > independent of the broker. Sooner or later, the load imbalance will
>> exceed
>> > > a threshold and we 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-07-12 Thread Dong Lin
The protocol change has been updated in KIP-113

.

On Wed, Jul 12, 2017 at 10:44 AM, Dong Lin  wrote:

> Hi all,
>
> I have made a minor change to the DescribeDirsRequest so that user can
> choose to query the status for a specific list of partitions. This is a bit
> more fine-granular than the previous format that allows user to query the
> status for a specific list of topics. I realized that querying the status
> of selected partitions can be useful to check the whether the reassignment
> of the replicas to the specific log directories has been completed.
>
> I will assume this minor change is OK if there is no concern with it in
> the community :)
>
> Thanks,
> Dong
>
>
>
> On Mon, Jun 12, 2017 at 10:46 AM, Dong Lin  wrote:
>
>> Hey Colin,
>>
>> Thanks for the suggestion. We have actually considered this and list this
>> as the first future work in KIP-112
>> .
>> The two advantages that you mentioned are exactly the motivation for this
>> feature. Also as you have mentioned, this involves the tradeoff between
>> disk performance and availability -- the more you distribute topic across
>> disks, the more topics will be offline due to a single disk failure.
>>
>> Despite its complexity, it is not clear to me that the reduced rebalance
>> overhead is worth the reduction in availability. I am optimistic that the
>> rebalance overhead will not be that a big problem since we are not too
>> bothered by cross-broker rebalance as of now.
>>
>> Thanks,
>> Dong
>>
>> On Mon, Jun 12, 2017 at 10:36 AM, Colin McCabe 
>> wrote:
>>
>>> Has anyone considered a scheme for sharding topic data across multiple
>>> disks?
>>>
>>> For example, if you sharded topics across 3 disks, and you had 10 disks,
>>> you could pick a different set of 3 disks for each topic.  If you
>>> distribute them randomly then you have 10 choose 3 = 120 different
>>> combinations.  You would probably never need rebalancing if you had a
>>> reasonable distribution of topic sizes (could probably prove this with a
>>> Monte Carlo or something).
>>>
>>> The disadvantage is that if one of the 3 disks fails, then you have to
>>> take the topic offline.  But if we assume independent disk failure
>>> probabilities, probability of failure with RAID 0 is: 1 -
>>> Psuccess^(num_disks) whereas the probability of failure with this scheme
>>> is 1 - Psuccess ^ 3.
>>>
>>> This addresses the biggest downsides of JBOD now:
>>> * limiting a topic to the size of a single disk limits scalability
>>> * the topic movement process is tricky to get right and involves "racing
>>> against producers" and wasted double I/Os
>>>
>>> Of course, one other question is how frequently we add new disk drives
>>> to an existing broker.  In this case, you might reasonably want disk
>>> rebalancing to avoid overloading the new disk(s) with writes.
>>>
>>> cheers,
>>> Colin
>>>
>>>
>>> On Fri, Jun 9, 2017, at 18:46, Jun Rao wrote:
>>> > Just a few comments on this.
>>> >
>>> > 1. One of the issues with using RAID 0 is that a single disk failure
>>> > causes
>>> > a hard failure of the broker. Hard failure increases the unavailability
>>> > window for all the partitions on the failed broker, which includes the
>>> > failure detection time (tied to ZK session timeout right now) and
>>> leader
>>> > election time by the controller. If we support JBOD natively, when a
>>> > single
>>> > disk fails, only partitions on the failed disk will experience a hard
>>> > failure. The availability for partitions on the rest of the disks are
>>> not
>>> > affected.
>>> >
>>> > 2. For running things on the Cloud such as AWS. Currently, each EBS
>>> > volume
>>> > has a throughout limit of about 300MB/sec. If you get an enhanced EC2
>>> > instance, you can get 20Gb/sec network. To saturate the network, you
>>> may
>>> > need about 7 EBS volumes. So, being able to support JBOD in the Cloud
>>> is
>>> > still potentially useful.
>>> >
>>> > 3. On the benefit of balancing data across disks within the same
>>> broker.
>>> > Data imbalance can happen across brokers as well as across disks within
>>> > the
>>> > same broker. Balancing the data across disks within the broker has the
>>> > benefit of saving network bandwidth as Dong mentioned. So, if intra
>>> > broker
>>> > load balancing is possible, it's probably better to avoid the more
>>> > expensive inter broker load balancing. One of the reasons for disk
>>> > imbalance right now is that partitions within a broker are assigned to
>>> > disks just based on the partition count. So, it does seem possible for
>>> > disks to get imbalanced from time to time. If someone can share some
>>> > stats
>>> > for that in practice, that will be very helpful.
>>> >
>>> > Thanks,
>>> >
>>> > Jun
>>> >
>>> 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-06-12 Thread Dong Lin
Hey Colin,

Thanks for the suggestion. We have actually considered this and list this
as the first future work in KIP-112
.
The two advantages that you mentioned are exactly the motivation for this
feature. Also as you have mentioned, this involves the tradeoff between
disk performance and availability -- the more you distribute topic across
disks, the more topics will be offline due to a single disk failure.

Despite its complexity, it is not clear to me that the reduced rebalance
overhead is worth the reduction in availability. I am optimistic that the
rebalance overhead will not be that a big problem since we are not too
bothered by cross-broker rebalance as of now.

Thanks,
Dong

On Mon, Jun 12, 2017 at 10:36 AM, Colin McCabe  wrote:

> Has anyone considered a scheme for sharding topic data across multiple
> disks?
>
> For example, if you sharded topics across 3 disks, and you had 10 disks,
> you could pick a different set of 3 disks for each topic.  If you
> distribute them randomly then you have 10 choose 3 = 120 different
> combinations.  You would probably never need rebalancing if you had a
> reasonable distribution of topic sizes (could probably prove this with a
> Monte Carlo or something).
>
> The disadvantage is that if one of the 3 disks fails, then you have to
> take the topic offline.  But if we assume independent disk failure
> probabilities, probability of failure with RAID 0 is: 1 -
> Psuccess^(num_disks) whereas the probability of failure with this scheme
> is 1 - Psuccess ^ 3.
>
> This addresses the biggest downsides of JBOD now:
> * limiting a topic to the size of a single disk limits scalability
> * the topic movement process is tricky to get right and involves "racing
> against producers" and wasted double I/Os
>
> Of course, one other question is how frequently we add new disk drives
> to an existing broker.  In this case, you might reasonably want disk
> rebalancing to avoid overloading the new disk(s) with writes.
>
> cheers,
> Colin
>
>
> On Fri, Jun 9, 2017, at 18:46, Jun Rao wrote:
> > Just a few comments on this.
> >
> > 1. One of the issues with using RAID 0 is that a single disk failure
> > causes
> > a hard failure of the broker. Hard failure increases the unavailability
> > window for all the partitions on the failed broker, which includes the
> > failure detection time (tied to ZK session timeout right now) and leader
> > election time by the controller. If we support JBOD natively, when a
> > single
> > disk fails, only partitions on the failed disk will experience a hard
> > failure. The availability for partitions on the rest of the disks are not
> > affected.
> >
> > 2. For running things on the Cloud such as AWS. Currently, each EBS
> > volume
> > has a throughout limit of about 300MB/sec. If you get an enhanced EC2
> > instance, you can get 20Gb/sec network. To saturate the network, you may
> > need about 7 EBS volumes. So, being able to support JBOD in the Cloud is
> > still potentially useful.
> >
> > 3. On the benefit of balancing data across disks within the same broker.
> > Data imbalance can happen across brokers as well as across disks within
> > the
> > same broker. Balancing the data across disks within the broker has the
> > benefit of saving network bandwidth as Dong mentioned. So, if intra
> > broker
> > load balancing is possible, it's probably better to avoid the more
> > expensive inter broker load balancing. One of the reasons for disk
> > imbalance right now is that partitions within a broker are assigned to
> > disks just based on the partition count. So, it does seem possible for
> > disks to get imbalanced from time to time. If someone can share some
> > stats
> > for that in practice, that will be very helpful.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Wed, Jun 7, 2017 at 2:30 PM, Dong Lin  wrote:
> >
> > > Hey Sriram,
> > >
> > > I think there is one way to explain why the ability to move replica
> between
> > > disks can save space. Let's say the load is distributed to disks
> > > independent of the broker. Sooner or later, the load imbalance will
> exceed
> > > a threshold and we will need to rebalance load across disks. Now our
> > > questions is whether our rebalancing algorithm will be able to take
> > > advantage of locality by moving replicas between disks on the same
> broker.
> > >
> > > Say for a given disk, there is 20% probability it is overloaded, 20%
> > > probability it is underloaded, and 60% probability its load is around
> the
> > > expected average load if the cluster is well balanced. Then for a
> broker of
> > > 10 disks, we would 2 disks need to have in-bound replica movement, 2
> disks
> > > need to have out-bound replica movement, and 6 disks do not need
> replica
> > > movement. Thus we would expect KIP-113 to be useful since we will be
> able
> > > to move replica from the two 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-06-12 Thread Colin McCabe
Has anyone considered a scheme for sharding topic data across multiple
disks?

For example, if you sharded topics across 3 disks, and you had 10 disks,
you could pick a different set of 3 disks for each topic.  If you
distribute them randomly then you have 10 choose 3 = 120 different
combinations.  You would probably never need rebalancing if you had a
reasonable distribution of topic sizes (could probably prove this with a
Monte Carlo or something).

The disadvantage is that if one of the 3 disks fails, then you have to
take the topic offline.  But if we assume independent disk failure
probabilities, probability of failure with RAID 0 is: 1 -
Psuccess^(num_disks) whereas the probability of failure with this scheme
is 1 - Psuccess ^ 3.

This addresses the biggest downsides of JBOD now:
* limiting a topic to the size of a single disk limits scalability
* the topic movement process is tricky to get right and involves "racing
against producers" and wasted double I/Os

Of course, one other question is how frequently we add new disk drives
to an existing broker.  In this case, you might reasonably want disk
rebalancing to avoid overloading the new disk(s) with writes.

cheers,
Colin


On Fri, Jun 9, 2017, at 18:46, Jun Rao wrote:
> Just a few comments on this.
> 
> 1. One of the issues with using RAID 0 is that a single disk failure
> causes
> a hard failure of the broker. Hard failure increases the unavailability
> window for all the partitions on the failed broker, which includes the
> failure detection time (tied to ZK session timeout right now) and leader
> election time by the controller. If we support JBOD natively, when a
> single
> disk fails, only partitions on the failed disk will experience a hard
> failure. The availability for partitions on the rest of the disks are not
> affected.
> 
> 2. For running things on the Cloud such as AWS. Currently, each EBS
> volume
> has a throughout limit of about 300MB/sec. If you get an enhanced EC2
> instance, you can get 20Gb/sec network. To saturate the network, you may
> need about 7 EBS volumes. So, being able to support JBOD in the Cloud is
> still potentially useful.
> 
> 3. On the benefit of balancing data across disks within the same broker.
> Data imbalance can happen across brokers as well as across disks within
> the
> same broker. Balancing the data across disks within the broker has the
> benefit of saving network bandwidth as Dong mentioned. So, if intra
> broker
> load balancing is possible, it's probably better to avoid the more
> expensive inter broker load balancing. One of the reasons for disk
> imbalance right now is that partitions within a broker are assigned to
> disks just based on the partition count. So, it does seem possible for
> disks to get imbalanced from time to time. If someone can share some
> stats
> for that in practice, that will be very helpful.
> 
> Thanks,
> 
> Jun
> 
> 
> On Wed, Jun 7, 2017 at 2:30 PM, Dong Lin  wrote:
> 
> > Hey Sriram,
> >
> > I think there is one way to explain why the ability to move replica between
> > disks can save space. Let's say the load is distributed to disks
> > independent of the broker. Sooner or later, the load imbalance will exceed
> > a threshold and we will need to rebalance load across disks. Now our
> > questions is whether our rebalancing algorithm will be able to take
> > advantage of locality by moving replicas between disks on the same broker.
> >
> > Say for a given disk, there is 20% probability it is overloaded, 20%
> > probability it is underloaded, and 60% probability its load is around the
> > expected average load if the cluster is well balanced. Then for a broker of
> > 10 disks, we would 2 disks need to have in-bound replica movement, 2 disks
> > need to have out-bound replica movement, and 6 disks do not need replica
> > movement. Thus we would expect KIP-113 to be useful since we will be able
> > to move replica from the two over-loaded disks to the two under-loaded
> > disks on the same broKER. Does this make sense?
> >
> > Thanks,
> > Dong
> >
> >
> >
> >
> >
> >
> > On Wed, Jun 7, 2017 at 2:12 PM, Dong Lin  wrote:
> >
> > > Hey Sriram,
> > >
> > > Thanks for raising these concerns. Let me answer these questions below:
> > >
> > > - The benefit of those additional complexity to move the data stored on a
> > > disk within the broker is to avoid network bandwidth usage. Creating
> > > replica on another broker is less efficient than creating replica on
> > > another disk in the same broker IF there is actually lightly-loaded disk
> > on
> > > the same broker.
> > >
> > > - In my opinion the rebalance algorithm would this: 1) we balance the
> > load
> > > across brokers using the same algorithm we are using today. 2) we balance
> > > load across disk on a given broker using a greedy algorithm, i.e. move
> > > replica from the overloaded disk to lightly loaded disk. The greedy
> > > algorithm would only consider the capacity and 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-06-09 Thread Jun Rao
Just a few comments on this.

1. One of the issues with using RAID 0 is that a single disk failure causes
a hard failure of the broker. Hard failure increases the unavailability
window for all the partitions on the failed broker, which includes the
failure detection time (tied to ZK session timeout right now) and leader
election time by the controller. If we support JBOD natively, when a single
disk fails, only partitions on the failed disk will experience a hard
failure. The availability for partitions on the rest of the disks are not
affected.

2. For running things on the Cloud such as AWS. Currently, each EBS volume
has a throughout limit of about 300MB/sec. If you get an enhanced EC2
instance, you can get 20Gb/sec network. To saturate the network, you may
need about 7 EBS volumes. So, being able to support JBOD in the Cloud is
still potentially useful.

3. On the benefit of balancing data across disks within the same broker.
Data imbalance can happen across brokers as well as across disks within the
same broker. Balancing the data across disks within the broker has the
benefit of saving network bandwidth as Dong mentioned. So, if intra broker
load balancing is possible, it's probably better to avoid the more
expensive inter broker load balancing. One of the reasons for disk
imbalance right now is that partitions within a broker are assigned to
disks just based on the partition count. So, it does seem possible for
disks to get imbalanced from time to time. If someone can share some stats
for that in practice, that will be very helpful.

Thanks,

Jun


On Wed, Jun 7, 2017 at 2:30 PM, Dong Lin  wrote:

> Hey Sriram,
>
> I think there is one way to explain why the ability to move replica between
> disks can save space. Let's say the load is distributed to disks
> independent of the broker. Sooner or later, the load imbalance will exceed
> a threshold and we will need to rebalance load across disks. Now our
> questions is whether our rebalancing algorithm will be able to take
> advantage of locality by moving replicas between disks on the same broker.
>
> Say for a given disk, there is 20% probability it is overloaded, 20%
> probability it is underloaded, and 60% probability its load is around the
> expected average load if the cluster is well balanced. Then for a broker of
> 10 disks, we would 2 disks need to have in-bound replica movement, 2 disks
> need to have out-bound replica movement, and 6 disks do not need replica
> movement. Thus we would expect KIP-113 to be useful since we will be able
> to move replica from the two over-loaded disks to the two under-loaded
> disks on the same broKER. Does this make sense?
>
> Thanks,
> Dong
>
>
>
>
>
>
> On Wed, Jun 7, 2017 at 2:12 PM, Dong Lin  wrote:
>
> > Hey Sriram,
> >
> > Thanks for raising these concerns. Let me answer these questions below:
> >
> > - The benefit of those additional complexity to move the data stored on a
> > disk within the broker is to avoid network bandwidth usage. Creating
> > replica on another broker is less efficient than creating replica on
> > another disk in the same broker IF there is actually lightly-loaded disk
> on
> > the same broker.
> >
> > - In my opinion the rebalance algorithm would this: 1) we balance the
> load
> > across brokers using the same algorithm we are using today. 2) we balance
> > load across disk on a given broker using a greedy algorithm, i.e. move
> > replica from the overloaded disk to lightly loaded disk. The greedy
> > algorithm would only consider the capacity and replica size. We can
> improve
> > it to consider throughput in the future.
> >
> > - With 30 brokers with each having 10 disks, using the rebalancing
> algorithm,
> > the chances of choosing disks within the broker can be high. There will
> > always be load imbalance across disks of the same broker for the same
> > reason that there will always be load imbalance across brokers. The
> > algorithm specified above will take advantage of the locality, i.e. first
> > balance load across disks of the same broker, and only balance across
> > brokers if some brokers are much more loaded than others.
> >
> > I think it is useful to note that the load imbalance across disks of the
> > same broker is independent of the load imbalance across brokers. Both are
> > guaranteed to happen in any Kafka cluster for the same reason, i.e.
> > variation in the partition size. Say broker 1 have two disks that are 80%
> > loaded and 20% loaded. And broker 2 have two disks that are also 80%
> > loaded and 20%. We can balance them without inter-broker traffic with
> > KIP-113.  This is why I think KIP-113 can be very useful.
> >
> > Do these explanation sound reasonable?
> >
> > Thanks,
> > Dong
> >
> >
> > On Wed, Jun 7, 2017 at 1:33 PM, Sriram Subramanian 
> > wrote:
> >
> >> Hey Dong,
> >>
> >> Thanks for the explanation. I don't think anyone is denying that we
> should
> >> rebalance at the disk 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-06-07 Thread Dong Lin
Hey Sriram,

I think there is one way to explain why the ability to move replica between
disks can save space. Let's say the load is distributed to disks
independent of the broker. Sooner or later, the load imbalance will exceed
a threshold and we will need to rebalance load across disks. Now our
questions is whether our rebalancing algorithm will be able to take
advantage of locality by moving replicas between disks on the same broker.

Say for a given disk, there is 20% probability it is overloaded, 20%
probability it is underloaded, and 60% probability its load is around the
expected average load if the cluster is well balanced. Then for a broker of
10 disks, we would 2 disks need to have in-bound replica movement, 2 disks
need to have out-bound replica movement, and 6 disks do not need replica
movement. Thus we would expect KIP-113 to be useful since we will be able
to move replica from the two over-loaded disks to the two under-loaded
disks on the same broKER. Does this make sense?

Thanks,
Dong






On Wed, Jun 7, 2017 at 2:12 PM, Dong Lin  wrote:

> Hey Sriram,
>
> Thanks for raising these concerns. Let me answer these questions below:
>
> - The benefit of those additional complexity to move the data stored on a
> disk within the broker is to avoid network bandwidth usage. Creating
> replica on another broker is less efficient than creating replica on
> another disk in the same broker IF there is actually lightly-loaded disk on
> the same broker.
>
> - In my opinion the rebalance algorithm would this: 1) we balance the load
> across brokers using the same algorithm we are using today. 2) we balance
> load across disk on a given broker using a greedy algorithm, i.e. move
> replica from the overloaded disk to lightly loaded disk. The greedy
> algorithm would only consider the capacity and replica size. We can improve
> it to consider throughput in the future.
>
> - With 30 brokers with each having 10 disks, using the rebalancing algorithm,
> the chances of choosing disks within the broker can be high. There will
> always be load imbalance across disks of the same broker for the same
> reason that there will always be load imbalance across brokers. The
> algorithm specified above will take advantage of the locality, i.e. first
> balance load across disks of the same broker, and only balance across
> brokers if some brokers are much more loaded than others.
>
> I think it is useful to note that the load imbalance across disks of the
> same broker is independent of the load imbalance across brokers. Both are
> guaranteed to happen in any Kafka cluster for the same reason, i.e.
> variation in the partition size. Say broker 1 have two disks that are 80%
> loaded and 20% loaded. And broker 2 have two disks that are also 80%
> loaded and 20%. We can balance them without inter-broker traffic with
> KIP-113.  This is why I think KIP-113 can be very useful.
>
> Do these explanation sound reasonable?
>
> Thanks,
> Dong
>
>
> On Wed, Jun 7, 2017 at 1:33 PM, Sriram Subramanian 
> wrote:
>
>> Hey Dong,
>>
>> Thanks for the explanation. I don't think anyone is denying that we should
>> rebalance at the disk level. I think it is important to restore the disk
>> and not wait for disk replacement. There are also other benefits of doing
>> that which is that you don't need to opt for hot swap racks that can save
>> cost.
>>
>> The question here is what do you save by trying to add complexity to move
>> the data stored on a disk within the broker? Why would you not simply
>> create another replica on the disk that results in a balanced load across
>> brokers and have it catch up. We are missing a few things here -
>> 1. What would your data balancing algorithm be? Would it include just
>> capacity or will it also consider throughput on disk to decide on the
>> final
>> location of a partition?
>> 2. With 30 brokers with each having 10 disks, using the rebalancing
>> algorithm, the chances of choosing disks within the broker is going to be
>> low. This probability further decreases with more brokers and disks. Given
>> that, why are we trying to save network cost? How much would that saving
>> be
>> if you go that route?
>>
>> These questions are hard to answer without having to verify empirically.
>> My
>> suggestion is to avoid doing pre matured optimization that brings in the
>> added complexity to the code and treat inter and intra broker movements of
>> partition the same. Deploy the code, use it and see if it is an actual
>> problem and you get great savings by avoiding the network route to move
>> partitions within the same broker. If so, add this optimization.
>>
>> On Wed, Jun 7, 2017 at 1:03 PM, Dong Lin  wrote:
>>
>> > Hey Jay, Sriram,
>> >
>> > Great point. If I understand you right, you are suggesting that we can
>> > simply use RAID-0 so that the load can be evenly distributed across
>> disks.
>> > And even though a disk failure will bring down 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-06-07 Thread Dong Lin
Hey Sriram,

Thanks for raising these concerns. Let me answer these questions below:

- The benefit of those additional complexity to move the data stored on a
disk within the broker is to avoid network bandwidth usage. Creating
replica on another broker is less efficient than creating replica on
another disk in the same broker IF there is actually lightly-loaded disk on
the same broker.

- In my opinion the rebalance algorithm would this: 1) we balance the load
across brokers using the same algorithm we are using today. 2) we balance
load across disk on a given broker using a greedy algorithm, i.e. move
replica from the overloaded disk to lightly loaded disk. The greedy
algorithm would only consider the capacity and replica size. We can improve
it to consider throughput in the future.

- With 30 brokers with each having 10 disks, using the rebalancing algorithm,
the chances of choosing disks within the broker can be high. There will
always be load imbalance across disks of the same broker for the same
reason that there will always be load imbalance across brokers. The
algorithm specified above will take advantage of the locality, i.e. first
balance load across disks of the same broker, and only balance across
brokers if some brokers are much more loaded than others.

I think it is useful to note that the load imbalance across disks of the
same broker is independent of the load imbalance across brokers. Both are
guaranteed to happen in any Kafka cluster for the same reason, i.e.
variation in the partition size. Say broker 1 have two disks that are 80%
loaded and 20% loaded. And broker 2 have two disks that are also 80% loaded
and 20%. We can balance them without inter-broker traffic with KIP-113.
This is why I think KIP-113 can be very useful.

Do these explanation sound reasonable?

Thanks,
Dong


On Wed, Jun 7, 2017 at 1:33 PM, Sriram Subramanian  wrote:

> Hey Dong,
>
> Thanks for the explanation. I don't think anyone is denying that we should
> rebalance at the disk level. I think it is important to restore the disk
> and not wait for disk replacement. There are also other benefits of doing
> that which is that you don't need to opt for hot swap racks that can save
> cost.
>
> The question here is what do you save by trying to add complexity to move
> the data stored on a disk within the broker? Why would you not simply
> create another replica on the disk that results in a balanced load across
> brokers and have it catch up. We are missing a few things here -
> 1. What would your data balancing algorithm be? Would it include just
> capacity or will it also consider throughput on disk to decide on the final
> location of a partition?
> 2. With 30 brokers with each having 10 disks, using the rebalancing
> algorithm, the chances of choosing disks within the broker is going to be
> low. This probability further decreases with more brokers and disks. Given
> that, why are we trying to save network cost? How much would that saving be
> if you go that route?
>
> These questions are hard to answer without having to verify empirically. My
> suggestion is to avoid doing pre matured optimization that brings in the
> added complexity to the code and treat inter and intra broker movements of
> partition the same. Deploy the code, use it and see if it is an actual
> problem and you get great savings by avoiding the network route to move
> partitions within the same broker. If so, add this optimization.
>
> On Wed, Jun 7, 2017 at 1:03 PM, Dong Lin  wrote:
>
> > Hey Jay, Sriram,
> >
> > Great point. If I understand you right, you are suggesting that we can
> > simply use RAID-0 so that the load can be evenly distributed across
> disks.
> > And even though a disk failure will bring down the enter broker, the
> > reduced availability as compared to using KIP-112 and KIP-113 will may be
> > negligible. And it may be better to just accept the slightly reduced
> > availability instead of introducing the complexity from KIP-112 and
> > KIP-113.
> >
> > Let's assume the following:
> >
> > - There are 30 brokers in a cluster and each broker has 10 disks
> > - The replication factor is 3 and min.isr = 2.
> > - The probability of annual disk failure rate is 2% according to this
> >  blog.
> > - It takes 3 days to replace a disk.
> >
> > Here is my calculation for probability of data loss due to disk failure:
> > probability of a given disk fails in a given year: 2%
> > probability of a given disk stays offline for one day in a given day: 2%
> /
> > 365 * 3
> > probability of a given broker stays offline for one day in a given day
> due
> > to disk failure: 2% / 365 * 3 * 10
> > probability of any broker stays offline for one day in a given day due to
> > disk failure: 2% / 365 * 3 * 10 * 30 = 5%
> > probability of any three broker stays offline for one day in a given day
> > due to disk failure: 5% * 5% * 5% = 0.0125%
> > 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-06-07 Thread Sriram Subramanian
Hey Dong,

Thanks for the explanation. I don't think anyone is denying that we should
rebalance at the disk level. I think it is important to restore the disk
and not wait for disk replacement. There are also other benefits of doing
that which is that you don't need to opt for hot swap racks that can save
cost.

The question here is what do you save by trying to add complexity to move
the data stored on a disk within the broker? Why would you not simply
create another replica on the disk that results in a balanced load across
brokers and have it catch up. We are missing a few things here -
1. What would your data balancing algorithm be? Would it include just
capacity or will it also consider throughput on disk to decide on the final
location of a partition?
2. With 30 brokers with each having 10 disks, using the rebalancing
algorithm, the chances of choosing disks within the broker is going to be
low. This probability further decreases with more brokers and disks. Given
that, why are we trying to save network cost? How much would that saving be
if you go that route?

These questions are hard to answer without having to verify empirically. My
suggestion is to avoid doing pre matured optimization that brings in the
added complexity to the code and treat inter and intra broker movements of
partition the same. Deploy the code, use it and see if it is an actual
problem and you get great savings by avoiding the network route to move
partitions within the same broker. If so, add this optimization.

On Wed, Jun 7, 2017 at 1:03 PM, Dong Lin  wrote:

> Hey Jay, Sriram,
>
> Great point. If I understand you right, you are suggesting that we can
> simply use RAID-0 so that the load can be evenly distributed across disks.
> And even though a disk failure will bring down the enter broker, the
> reduced availability as compared to using KIP-112 and KIP-113 will may be
> negligible. And it may be better to just accept the slightly reduced
> availability instead of introducing the complexity from KIP-112 and
> KIP-113.
>
> Let's assume the following:
>
> - There are 30 brokers in a cluster and each broker has 10 disks
> - The replication factor is 3 and min.isr = 2.
> - The probability of annual disk failure rate is 2% according to this
>  blog.
> - It takes 3 days to replace a disk.
>
> Here is my calculation for probability of data loss due to disk failure:
> probability of a given disk fails in a given year: 2%
> probability of a given disk stays offline for one day in a given day: 2% /
> 365 * 3
> probability of a given broker stays offline for one day in a given day due
> to disk failure: 2% / 365 * 3 * 10
> probability of any broker stays offline for one day in a given day due to
> disk failure: 2% / 365 * 3 * 10 * 30 = 5%
> probability of any three broker stays offline for one day in a given day
> due to disk failure: 5% * 5% * 5% = 0.0125%
> probability of data loss due to disk failure: 0.0125%
>
> Here is my calculation for probability of service unavailability due to
> disk failure:
> probability of a given disk fails in a given year: 2%
> probability of a given disk stays offline for one day in a given day: 2% /
> 365 * 3
> probability of a given broker stays offline for one day in a given day due
> to disk failure: 2% / 365 * 3 * 10
> probability of any broker stays offline for one day in a given day due to
> disk failure: 2% / 365 * 3 * 10 * 30 = 5%
> probability of any two broker stays offline for one day in a given day due
> to disk failure: 5% * 5% * 5% = 0.25%
> probability of unavailability due to disk failure: 0.25%
>
> Note that the unavailability due to disk failure will be unacceptably high
> in this case. And the probability of data loss due to disk failure will be
> higher than 0.01%. Neither is acceptable if Kafka is intended to achieve
> four nigh availability.
>
> Thanks,
> Dong
>
>
> On Tue, Jun 6, 2017 at 11:26 PM, Jay Kreps  wrote:
>
> > I think Ram's point is that in place failure is pretty complicated, and
> > this is meant to be a cost saving feature, we should construct an
> argument
> > for it grounded in data.
> >
> > Assume an annual failure rate of 1% (reasonable, but data is available
> > online), and assume it takes 3 days to get the drive replaced. Say you
> have
> > 10 drives per server. Then the expected downtime for each server is
> roughly
> > 1% * 3 days * 10 = 0.3 days/year (this is slightly off since I'm ignoring
> > the case of multiple failures, but I don't know that changes it much). So
> > the savings from this feature is 0.3/365 = 0.08%. Say you have 1000
> servers
> > and they cost $3000/year fully loaded including power, the cost of the hw
> > amortized over it's life, etc. Then this feature saves you $3000 on your
> > total server cost of $3m which seems not very worthwhile compared to
> other
> > optimizations...?
> >
> > Anyhow, not sure the arithmetic is right 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-06-07 Thread Dong Lin
Hey Jay, Sriram,

Great point. If I understand you right, you are suggesting that we can
simply use RAID-0 so that the load can be evenly distributed across disks.
And even though a disk failure will bring down the enter broker, the
reduced availability as compared to using KIP-112 and KIP-113 will may be
negligible. And it may be better to just accept the slightly reduced
availability instead of introducing the complexity from KIP-112 and KIP-113.

Let's assume the following:

- There are 30 brokers in a cluster and each broker has 10 disks
- The replication factor is 3 and min.isr = 2.
- The probability of annual disk failure rate is 2% according to this
 blog.
- It takes 3 days to replace a disk.

Here is my calculation for probability of data loss due to disk failure:
probability of a given disk fails in a given year: 2%
probability of a given disk stays offline for one day in a given day: 2% /
365 * 3
probability of a given broker stays offline for one day in a given day due
to disk failure: 2% / 365 * 3 * 10
probability of any broker stays offline for one day in a given day due to
disk failure: 2% / 365 * 3 * 10 * 30 = 5%
probability of any three broker stays offline for one day in a given day
due to disk failure: 5% * 5% * 5% = 0.0125%
probability of data loss due to disk failure: 0.0125%

Here is my calculation for probability of service unavailability due to
disk failure:
probability of a given disk fails in a given year: 2%
probability of a given disk stays offline for one day in a given day: 2% /
365 * 3
probability of a given broker stays offline for one day in a given day due
to disk failure: 2% / 365 * 3 * 10
probability of any broker stays offline for one day in a given day due to
disk failure: 2% / 365 * 3 * 10 * 30 = 5%
probability of any two broker stays offline for one day in a given day due
to disk failure: 5% * 5% * 5% = 0.25%
probability of unavailability due to disk failure: 0.25%

Note that the unavailability due to disk failure will be unacceptably high
in this case. And the probability of data loss due to disk failure will be
higher than 0.01%. Neither is acceptable if Kafka is intended to achieve
four nigh availability.

Thanks,
Dong


On Tue, Jun 6, 2017 at 11:26 PM, Jay Kreps  wrote:

> I think Ram's point is that in place failure is pretty complicated, and
> this is meant to be a cost saving feature, we should construct an argument
> for it grounded in data.
>
> Assume an annual failure rate of 1% (reasonable, but data is available
> online), and assume it takes 3 days to get the drive replaced. Say you have
> 10 drives per server. Then the expected downtime for each server is roughly
> 1% * 3 days * 10 = 0.3 days/year (this is slightly off since I'm ignoring
> the case of multiple failures, but I don't know that changes it much). So
> the savings from this feature is 0.3/365 = 0.08%. Say you have 1000 servers
> and they cost $3000/year fully loaded including power, the cost of the hw
> amortized over it's life, etc. Then this feature saves you $3000 on your
> total server cost of $3m which seems not very worthwhile compared to other
> optimizations...?
>
> Anyhow, not sure the arithmetic is right there, but i think that is the
> type of argument that would be helpful to think about the tradeoff in
> complexity.
>
> -Jay
>
>
>
> On Tue, Jun 6, 2017 at 7:09 PM, Dong Lin  wrote:
>
> > Hey Sriram,
> >
> > Thanks for taking time to review the KIP. Please see below my answers to
> > your questions:
> >
> > >1. Could you pick a hardware/Kafka configuration and go over what is the
> > >average disk/partition repair/restore time that we are targeting for a
> > >typical JBOD setup?
> >
> > We currently don't have this data. I think the disk/partition
> repair/store
> > time depends on availability of hardware, the response time of
> > site-reliability engineer, the amount of data on the bad disk etc. These
> > vary between companies and even clusters within the same company and it
> is
> > probably hard to determine what is the average situation.
> >
> > I am not very sure why we need this. Can you explain a bit why this data
> is
> > useful to evaluate the motivation and design of this KIP?
> >
> > >2. How often do we believe disks are going to fail (in your example
> > >configuration) and what do we gain by avoiding the network overhead and
> > >doing all the work of moving the replica within the broker to another
> disk
> > >instead of balancing it globally?
> >
> > I think the chance of disk failure depends mainly on the disk itself
> rather
> > than the broker configuration. I don't have this data now. I will ask our
> > SRE whether they know the mean-time-to-fail for our disk. What I was told
> > by SRE is that disk failure is the most common type of hardware failure.
> >
> > When there is disk failure, I think it is reasonable to move replica to
> > another broker instead 

RE: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-06-07 Thread Yu, Mason
All:

 Solid state drives

 Depreciation - straightline vs accelerated

 Cost of capital - present value versus future value of the dollar
 Outright buy or loan from from bank(interest rate of loan)

 Frequency of the SATA drive failure

 Cost of the server farm - office space, raised floors

  Energy consumption of the server farm

  Cost of racks 42U, networking Ethernet 10 Gig 1Gig E

  Primary data center vs disaster recovery (DR)

  On-prem vs Google Cloud, Microsoft Azure vs Amazon AWS

  Cosr of labor on-prem - salaries dev ops personnel


  - M

  

-Original Message-
From: Jay Kreps [mailto:j...@confluent.io] 
Sent: Wednesday, June 07, 2017 2:27 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-113: Support replicas movement between log 
directories

I think Ram's point is that in place failure is pretty complicated, and this is 
meant to be a cost saving feature, we should construct an argument for it 
grounded in data.

Assume an annual failure rate of 1% (reasonable, but data is available online), 
and assume it takes 3 days to get the drive replaced. Say you have
10 drives per server. Then the expected downtime for each server is roughly 1% 
* 3 days * 10 = 0.3 days/year (this is slightly off since I'm ignoring the case 
of multiple failures, but I don't know that changes it much). So the savings 
from this feature is 0.3/365 = 0.08%. Say you have 1000 servers and they cost 
$3000/year fully loaded including power, the cost of the hw amortized over it's 
life, etc. Then this feature saves you $3000 on your total server cost of $3m 
which seems not very worthwhile compared to other optimizations...?

Anyhow, not sure the arithmetic is right there, but i think that is the type of 
argument that would be helpful to think about the tradeoff in complexity.

-Jay



On Tue, Jun 6, 2017 at 7:09 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Sriram,
>
> Thanks for taking time to review the KIP. Please see below my answers 
> to your questions:
>
> >1. Could you pick a hardware/Kafka configuration and go over what is 
> >the average disk/partition repair/restore time that we are targeting 
> >for a typical JBOD setup?
>
> We currently don't have this data. I think the disk/partition 
> repair/store time depends on availability of hardware, the response 
> time of site-reliability engineer, the amount of data on the bad disk 
> etc. These vary between companies and even clusters within the same 
> company and it is probably hard to determine what is the average situation.
>
> I am not very sure why we need this. Can you explain a bit why this 
> data is useful to evaluate the motivation and design of this KIP?
>
> >2. How often do we believe disks are going to fail (in your example
> >configuration) and what do we gain by avoiding the network overhead 
> >and doing all the work of moving the replica within the broker to 
> >another disk instead of balancing it globally?
>
> I think the chance of disk failure depends mainly on the disk itself 
> rather than the broker configuration. I don't have this data now. I 
> will ask our SRE whether they know the mean-time-to-fail for our disk. 
> What I was told by SRE is that disk failure is the most common type of 
> hardware failure.
>
> When there is disk failure, I think it is reasonable to move replica 
> to another broker instead of another disk on the same broker. The 
> reason we want to move replica within broker is mainly to optimize the 
> Kafka cluster performance when we balance load across disks.
>
> In comparison to balancing replicas globally, the benefit of moving 
> replica within broker is that:
>
> 1) the movement is faster since it doesn't go through socket or rely 
> on the available network bandwidth;
> 2) much less impact on the replication traffic between broker by not 
> taking up bandwidth between brokers. Depending on the pattern of 
> traffic, we may need to balance load across disk frequently and it is 
> necessary to prevent this operation from slowing down the existing 
> operation (e.g. produce, consume, replication) in the Kafka cluster.
> 3) It gives us opportunity to do automatic broker rebalance between 
> disks on the same broker.
>
>
> >3. Even if we had to move the replica within the broker, why cannot 
> >we
> just
> >treat it as another replica and have it go through the same 
> >replication code path that we have today? The downside here is 
> >obviously that you need to catchup from the leader but it is 
> >completely free! What do we think is the impact of the network overhead in 
> >this case?
>
> Good point. My initial proposal actually used the existing 
> ReplicaFetcherTh

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-06-07 Thread Jay Kreps
I think Ram's point is that in place failure is pretty complicated, and
this is meant to be a cost saving feature, we should construct an argument
for it grounded in data.

Assume an annual failure rate of 1% (reasonable, but data is available
online), and assume it takes 3 days to get the drive replaced. Say you have
10 drives per server. Then the expected downtime for each server is roughly
1% * 3 days * 10 = 0.3 days/year (this is slightly off since I'm ignoring
the case of multiple failures, but I don't know that changes it much). So
the savings from this feature is 0.3/365 = 0.08%. Say you have 1000 servers
and they cost $3000/year fully loaded including power, the cost of the hw
amortized over it's life, etc. Then this feature saves you $3000 on your
total server cost of $3m which seems not very worthwhile compared to other
optimizations...?

Anyhow, not sure the arithmetic is right there, but i think that is the
type of argument that would be helpful to think about the tradeoff in
complexity.

-Jay



On Tue, Jun 6, 2017 at 7:09 PM, Dong Lin  wrote:

> Hey Sriram,
>
> Thanks for taking time to review the KIP. Please see below my answers to
> your questions:
>
> >1. Could you pick a hardware/Kafka configuration and go over what is the
> >average disk/partition repair/restore time that we are targeting for a
> >typical JBOD setup?
>
> We currently don't have this data. I think the disk/partition repair/store
> time depends on availability of hardware, the response time of
> site-reliability engineer, the amount of data on the bad disk etc. These
> vary between companies and even clusters within the same company and it is
> probably hard to determine what is the average situation.
>
> I am not very sure why we need this. Can you explain a bit why this data is
> useful to evaluate the motivation and design of this KIP?
>
> >2. How often do we believe disks are going to fail (in your example
> >configuration) and what do we gain by avoiding the network overhead and
> >doing all the work of moving the replica within the broker to another disk
> >instead of balancing it globally?
>
> I think the chance of disk failure depends mainly on the disk itself rather
> than the broker configuration. I don't have this data now. I will ask our
> SRE whether they know the mean-time-to-fail for our disk. What I was told
> by SRE is that disk failure is the most common type of hardware failure.
>
> When there is disk failure, I think it is reasonable to move replica to
> another broker instead of another disk on the same broker. The reason we
> want to move replica within broker is mainly to optimize the Kafka cluster
> performance when we balance load across disks.
>
> In comparison to balancing replicas globally, the benefit of moving replica
> within broker is that:
>
> 1) the movement is faster since it doesn't go through socket or rely on the
> available network bandwidth;
> 2) much less impact on the replication traffic between broker by not taking
> up bandwidth between brokers. Depending on the pattern of traffic, we may
> need to balance load across disk frequently and it is necessary to prevent
> this operation from slowing down the existing operation (e.g. produce,
> consume, replication) in the Kafka cluster.
> 3) It gives us opportunity to do automatic broker rebalance between disks
> on the same broker.
>
>
> >3. Even if we had to move the replica within the broker, why cannot we
> just
> >treat it as another replica and have it go through the same replication
> >code path that we have today? The downside here is obviously that you need
> >to catchup from the leader but it is completely free! What do we think is
> >the impact of the network overhead in this case?
>
> Good point. My initial proposal actually used the existing
> ReplicaFetcherThread (i.e. the existing code path) to move replica between
> disks. However, I switched to use separate thread pool after discussion
> with Jun and Becket.
>
> The main argument for using separate thread pool is to actually keep the
> design simply and easy to reason about. There are a number of difference
> between inter-broker replication and intra-broker replication which makes
> it cleaner to do them in separate code path. I will list them below:
>
> - The throttling mechanism for inter-broker replication traffic and
> intra-broker replication traffic is different. For example, we may want to
> specify per-topic quota for inter-broker replication traffic because we may
> want some topic to be moved faster than other topic. But we don't care
> about priority of topics for intra-broker movement. So the current proposal
> only allows user to specify per-broker quota for inter-broker replication
> traffic.
>
> - The quota value for inter-broker replication traffic and intra-broker
> replication traffic is different. The available bandwidth for inter-broker
> replication can probably be much higher than the bandwidth for inter-broker
> 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-06-06 Thread Dong Lin
Hey Sriram,

Thanks for taking time to review the KIP. Please see below my answers to
your questions:

>1. Could you pick a hardware/Kafka configuration and go over what is the
>average disk/partition repair/restore time that we are targeting for a
>typical JBOD setup?

We currently don't have this data. I think the disk/partition repair/store
time depends on availability of hardware, the response time of
site-reliability engineer, the amount of data on the bad disk etc. These
vary between companies and even clusters within the same company and it is
probably hard to determine what is the average situation.

I am not very sure why we need this. Can you explain a bit why this data is
useful to evaluate the motivation and design of this KIP?

>2. How often do we believe disks are going to fail (in your example
>configuration) and what do we gain by avoiding the network overhead and
>doing all the work of moving the replica within the broker to another disk
>instead of balancing it globally?

I think the chance of disk failure depends mainly on the disk itself rather
than the broker configuration. I don't have this data now. I will ask our
SRE whether they know the mean-time-to-fail for our disk. What I was told
by SRE is that disk failure is the most common type of hardware failure.

When there is disk failure, I think it is reasonable to move replica to
another broker instead of another disk on the same broker. The reason we
want to move replica within broker is mainly to optimize the Kafka cluster
performance when we balance load across disks.

In comparison to balancing replicas globally, the benefit of moving replica
within broker is that:

1) the movement is faster since it doesn't go through socket or rely on the
available network bandwidth;
2) much less impact on the replication traffic between broker by not taking
up bandwidth between brokers. Depending on the pattern of traffic, we may
need to balance load across disk frequently and it is necessary to prevent
this operation from slowing down the existing operation (e.g. produce,
consume, replication) in the Kafka cluster.
3) It gives us opportunity to do automatic broker rebalance between disks
on the same broker.


>3. Even if we had to move the replica within the broker, why cannot we just
>treat it as another replica and have it go through the same replication
>code path that we have today? The downside here is obviously that you need
>to catchup from the leader but it is completely free! What do we think is
>the impact of the network overhead in this case?

Good point. My initial proposal actually used the existing
ReplicaFetcherThread (i.e. the existing code path) to move replica between
disks. However, I switched to use separate thread pool after discussion
with Jun and Becket.

The main argument for using separate thread pool is to actually keep the
design simply and easy to reason about. There are a number of difference
between inter-broker replication and intra-broker replication which makes
it cleaner to do them in separate code path. I will list them below:

- The throttling mechanism for inter-broker replication traffic and
intra-broker replication traffic is different. For example, we may want to
specify per-topic quota for inter-broker replication traffic because we may
want some topic to be moved faster than other topic. But we don't care
about priority of topics for intra-broker movement. So the current proposal
only allows user to specify per-broker quota for inter-broker replication
traffic.

- The quota value for inter-broker replication traffic and intra-broker
replication traffic is different. The available bandwidth for inter-broker
replication can probably be much higher than the bandwidth for inter-broker
replication.

- The ReplicaFetchThread is per broker. Intuitively, the number of threads
doing intra broker data movement should be related to the number of disks
in the broker, not the number of brokers in the cluster.

- The leader replica has no ReplicaFetchThread to start with. It seems weird to
start one just for intra-broker replication.

Because of these difference, we think it is simpler to use separate thread
pool and code path so that we can configure and throttle them separately.


>4. What are the chances that we will be able to identify another disk to
>balance within the broker instead of another disk on another broker? If we
>have 100's of machines, the probability of finding a better balance by
>choosing another broker is much higher than balancing within the broker.
>Could you add some info on how we are determining this?

It is possible that we can find available space on a remote broker. The
benefit of allowing intra-broker replication is that, when there are
available space in both the current broker and a remote broker, the
rebalance can be completed faster with much less impact on the inter-broker
replication or the users traffic. It is about taking advantage of locality
when balance the load.

>5. 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-30 Thread Dong Lin
Thanks Jun!

Hi all,

Thanks for all the comments. I am going to open the voting thread if there
is no further concern with the KIP.

Dong

On Thu, Mar 30, 2017 at 3:19 PM, Jun Rao  wrote:

> Hi, Dong,
>
> I don't have further concerns. If there are no more comments from other
> people, we can start the vote.
>
> Thanks,
>
> Jun
>
> On Thu, Mar 30, 2017 at 10:59 AM, Dong Lin  wrote:
>
> > Hey Jun,
> >
> > Thanks much for the comment! Do you think we start vote for KIP-112 and
> > KIP-113 if there is no further concern?
> >
> > Dong
> >
> > On Thu, Mar 30, 2017 at 10:40 AM, Jun Rao  wrote:
> >
> > > Hi, Dong,
> > >
> > > Ok, so it seems that in solution (2), if the tool exits successfully,
> > then
> > > we know for sure that all replicas will be in the right log dirs.
> > Solution
> > > (1) doesn't guarantee that. That seems better and we can go with your
> > > current solution then.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Fri, Mar 24, 2017 at 4:28 PM, Dong Lin  wrote:
> > >
> > > > Hey Jun,
> > > >
> > > > No.. the current approach describe in the KIP (see here
> > > >  > > > 3A+Support+replicas+movement+between+log+directories#KIP-
> > > > 113:Supportreplicasmovementbetweenlogdirectories-2)Howtoreas
> > > > signreplicabetweenlogdirectoriesacrossbrokers>)
> > > > also sends ChangeReplicaDirRequest before writing reassignment path
> in
> > > ZK.
> > > > I think we discussing whether ChangeReplicaDirResponse (1) shows
> > success
> > > or
> > > > (2) should specify ReplicaNotAvailableException, if replica has not
> > been
> > > > created yet.
> > > >
> > > > Since both solution will send ChangeReplicaDirRequest before writing
> > > > reassignment in ZK, their chance of creating replica in the right
> > > directory
> > > > is the same.
> > > >
> > > > To take care of the rarer case that some brokers go down immediately
> > > after
> > > > the reassignment tool is run, solution (1) requires reassignment tool
> > to
> > > > repeatedly send DescribeDirRequest and ChangeReplicaDirRequest, while
> > > > solution (1) requires tool to only retry ChangeReplicaDirRequest if
> the
> > > > response says ReplicaNotAvailableException. It seems that solution
> (2)
> > is
> > > > cleaner because ChangeReplicaDirRequest won't depend on
> > > DescribeDirRequest.
> > > > What do you think?
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > >
> > > > On Fri, Mar 24, 2017 at 3:56 PM, Jun Rao  wrote:
> > > >
> > > > > Hi, Dong,
> > > > >
> > > > > We are just comparing whether it's better for the reassignment tool
> > to
> > > > > send ChangeReplicaDirRequest
> > > > > (1) before or (2) after writing the reassignment path in ZK .
> > > > >
> > > > > In the case when all brokers are alive when the reassignment tool
> is
> > > run,
> > > > > (1) guarantees 100% that the new replicas will be in the right log
> > dirs
> > > > and
> > > > > (2) can't.
> > > > >
> > > > > In the rarer case that some brokers go down immediately after the
> > > > > reassignment tool is run, in either approach, there is a chance
> when
> > > the
> > > > > failed broker comes back, it will complete the pending reassignment
> > > > process
> > > > > by putting some replicas in the wrong log dirs.
> > > > >
> > > > > Implementation wise, (1) and (2) seem to be the same. So, it seems
> to
> > > me
> > > > > that (1) is better?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Thu, Mar 23, 2017 at 11:54 PM, Dong Lin 
> > > wrote:
> > > > >
> > > > > > Hey Jun,
> > > > > >
> > > > > > Thanks much for the response! I agree with you that if multiple
> > > > replicas
> > > > > > are created in the wrong directory, we may waste resource if
> either
> > > > > > replicaMoveThread number is low or intra.broker.throttled.rate is
> > > slow.
> > > > > > Then the question is whether the suggested approach increases the
> > > > chance
> > > > > of
> > > > > > replica being created in the correct log directory.
> > > > > >
> > > > > > I think the answer is no due to the argument provided in the
> > previous
> > > > > > email. Sending ChangeReplicaDirRequest before updating znode has
> > > > > negligible
> > > > > > impact on the chance that the broker processes
> > > ChangeReplicaDirRequest
> > > > > > before LeaderAndIsrRequest from controller. If we still worry
> about
> > > the
> > > > > > order they are sent, the reassignment tool can first send
> > > > > > ChangeReplicaDirRequest (so that broker remembers it in memory),
> > > create
> > > > > > reassignment znode, and then retry ChangeReplicaDirRequset if the
> > > > > previous
> > > > > > ChangeReplicaDirResponse says the replica has not been created.
> > This
> > > > > should
> > > > > > give us the highest possible chance of creating replica in the
> > > correct
> > > > > > directory and 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-30 Thread Jun Rao
Hi, Dong,

I don't have further concerns. If there are no more comments from other
people, we can start the vote.

Thanks,

Jun

On Thu, Mar 30, 2017 at 10:59 AM, Dong Lin  wrote:

> Hey Jun,
>
> Thanks much for the comment! Do you think we start vote for KIP-112 and
> KIP-113 if there is no further concern?
>
> Dong
>
> On Thu, Mar 30, 2017 at 10:40 AM, Jun Rao  wrote:
>
> > Hi, Dong,
> >
> > Ok, so it seems that in solution (2), if the tool exits successfully,
> then
> > we know for sure that all replicas will be in the right log dirs.
> Solution
> > (1) doesn't guarantee that. That seems better and we can go with your
> > current solution then.
> >
> > Thanks,
> >
> > Jun
> >
> > On Fri, Mar 24, 2017 at 4:28 PM, Dong Lin  wrote:
> >
> > > Hey Jun,
> > >
> > > No.. the current approach describe in the KIP (see here
> > >  > > 3A+Support+replicas+movement+between+log+directories#KIP-
> > > 113:Supportreplicasmovementbetweenlogdirectories-2)Howtoreas
> > > signreplicabetweenlogdirectoriesacrossbrokers>)
> > > also sends ChangeReplicaDirRequest before writing reassignment path in
> > ZK.
> > > I think we discussing whether ChangeReplicaDirResponse (1) shows
> success
> > or
> > > (2) should specify ReplicaNotAvailableException, if replica has not
> been
> > > created yet.
> > >
> > > Since both solution will send ChangeReplicaDirRequest before writing
> > > reassignment in ZK, their chance of creating replica in the right
> > directory
> > > is the same.
> > >
> > > To take care of the rarer case that some brokers go down immediately
> > after
> > > the reassignment tool is run, solution (1) requires reassignment tool
> to
> > > repeatedly send DescribeDirRequest and ChangeReplicaDirRequest, while
> > > solution (1) requires tool to only retry ChangeReplicaDirRequest if the
> > > response says ReplicaNotAvailableException. It seems that solution (2)
> is
> > > cleaner because ChangeReplicaDirRequest won't depend on
> > DescribeDirRequest.
> > > What do you think?
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > > On Fri, Mar 24, 2017 at 3:56 PM, Jun Rao  wrote:
> > >
> > > > Hi, Dong,
> > > >
> > > > We are just comparing whether it's better for the reassignment tool
> to
> > > > send ChangeReplicaDirRequest
> > > > (1) before or (2) after writing the reassignment path in ZK .
> > > >
> > > > In the case when all brokers are alive when the reassignment tool is
> > run,
> > > > (1) guarantees 100% that the new replicas will be in the right log
> dirs
> > > and
> > > > (2) can't.
> > > >
> > > > In the rarer case that some brokers go down immediately after the
> > > > reassignment tool is run, in either approach, there is a chance when
> > the
> > > > failed broker comes back, it will complete the pending reassignment
> > > process
> > > > by putting some replicas in the wrong log dirs.
> > > >
> > > > Implementation wise, (1) and (2) seem to be the same. So, it seems to
> > me
> > > > that (1) is better?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Thu, Mar 23, 2017 at 11:54 PM, Dong Lin 
> > wrote:
> > > >
> > > > > Hey Jun,
> > > > >
> > > > > Thanks much for the response! I agree with you that if multiple
> > > replicas
> > > > > are created in the wrong directory, we may waste resource if either
> > > > > replicaMoveThread number is low or intra.broker.throttled.rate is
> > slow.
> > > > > Then the question is whether the suggested approach increases the
> > > chance
> > > > of
> > > > > replica being created in the correct log directory.
> > > > >
> > > > > I think the answer is no due to the argument provided in the
> previous
> > > > > email. Sending ChangeReplicaDirRequest before updating znode has
> > > > negligible
> > > > > impact on the chance that the broker processes
> > ChangeReplicaDirRequest
> > > > > before LeaderAndIsrRequest from controller. If we still worry about
> > the
> > > > > order they are sent, the reassignment tool can first send
> > > > > ChangeReplicaDirRequest (so that broker remembers it in memory),
> > create
> > > > > reassignment znode, and then retry ChangeReplicaDirRequset if the
> > > > previous
> > > > > ChangeReplicaDirResponse says the replica has not been created.
> This
> > > > should
> > > > > give us the highest possible chance of creating replica in the
> > correct
> > > > > directory and avoid the problem of the suggested approach. I have
> > > updated
> > > > > "How
> > > > > to reassign replica between log directories across brokers" in the
> > KIP
> > > to
> > > > > explain this procedure.
> > > > >
> > > > > To answer your question, the reassignment tool should fail with
> with
> > > > proper
> > > > > error message if user has specified log directory for a replica on
> an
> > > > > offline broker.  This is reasonable because reassignment tool can
> not
> > > > > guarantee 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-30 Thread Dong Lin
Hey Jun,

Thanks much for the comment! Do you think we start vote for KIP-112 and
KIP-113 if there is no further concern?

Dong

On Thu, Mar 30, 2017 at 10:40 AM, Jun Rao  wrote:

> Hi, Dong,
>
> Ok, so it seems that in solution (2), if the tool exits successfully, then
> we know for sure that all replicas will be in the right log dirs. Solution
> (1) doesn't guarantee that. That seems better and we can go with your
> current solution then.
>
> Thanks,
>
> Jun
>
> On Fri, Mar 24, 2017 at 4:28 PM, Dong Lin  wrote:
>
> > Hey Jun,
> >
> > No.. the current approach describe in the KIP (see here
> >  > 3A+Support+replicas+movement+between+log+directories#KIP-
> > 113:Supportreplicasmovementbetweenlogdirectories-2)Howtoreas
> > signreplicabetweenlogdirectoriesacrossbrokers>)
> > also sends ChangeReplicaDirRequest before writing reassignment path in
> ZK.
> > I think we discussing whether ChangeReplicaDirResponse (1) shows success
> or
> > (2) should specify ReplicaNotAvailableException, if replica has not been
> > created yet.
> >
> > Since both solution will send ChangeReplicaDirRequest before writing
> > reassignment in ZK, their chance of creating replica in the right
> directory
> > is the same.
> >
> > To take care of the rarer case that some brokers go down immediately
> after
> > the reassignment tool is run, solution (1) requires reassignment tool to
> > repeatedly send DescribeDirRequest and ChangeReplicaDirRequest, while
> > solution (1) requires tool to only retry ChangeReplicaDirRequest if the
> > response says ReplicaNotAvailableException. It seems that solution (2) is
> > cleaner because ChangeReplicaDirRequest won't depend on
> DescribeDirRequest.
> > What do you think?
> >
> > Thanks,
> > Dong
> >
> >
> > On Fri, Mar 24, 2017 at 3:56 PM, Jun Rao  wrote:
> >
> > > Hi, Dong,
> > >
> > > We are just comparing whether it's better for the reassignment tool to
> > > send ChangeReplicaDirRequest
> > > (1) before or (2) after writing the reassignment path in ZK .
> > >
> > > In the case when all brokers are alive when the reassignment tool is
> run,
> > > (1) guarantees 100% that the new replicas will be in the right log dirs
> > and
> > > (2) can't.
> > >
> > > In the rarer case that some brokers go down immediately after the
> > > reassignment tool is run, in either approach, there is a chance when
> the
> > > failed broker comes back, it will complete the pending reassignment
> > process
> > > by putting some replicas in the wrong log dirs.
> > >
> > > Implementation wise, (1) and (2) seem to be the same. So, it seems to
> me
> > > that (1) is better?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Thu, Mar 23, 2017 at 11:54 PM, Dong Lin 
> wrote:
> > >
> > > > Hey Jun,
> > > >
> > > > Thanks much for the response! I agree with you that if multiple
> > replicas
> > > > are created in the wrong directory, we may waste resource if either
> > > > replicaMoveThread number is low or intra.broker.throttled.rate is
> slow.
> > > > Then the question is whether the suggested approach increases the
> > chance
> > > of
> > > > replica being created in the correct log directory.
> > > >
> > > > I think the answer is no due to the argument provided in the previous
> > > > email. Sending ChangeReplicaDirRequest before updating znode has
> > > negligible
> > > > impact on the chance that the broker processes
> ChangeReplicaDirRequest
> > > > before LeaderAndIsrRequest from controller. If we still worry about
> the
> > > > order they are sent, the reassignment tool can first send
> > > > ChangeReplicaDirRequest (so that broker remembers it in memory),
> create
> > > > reassignment znode, and then retry ChangeReplicaDirRequset if the
> > > previous
> > > > ChangeReplicaDirResponse says the replica has not been created. This
> > > should
> > > > give us the highest possible chance of creating replica in the
> correct
> > > > directory and avoid the problem of the suggested approach. I have
> > updated
> > > > "How
> > > > to reassign replica between log directories across brokers" in the
> KIP
> > to
> > > > explain this procedure.
> > > >
> > > > To answer your question, the reassignment tool should fail with with
> > > proper
> > > > error message if user has specified log directory for a replica on an
> > > > offline broker.  This is reasonable because reassignment tool can not
> > > > guarantee that the replica will be moved to the specified log
> directory
> > > if
> > > > the broker is offline. If all brokers are online, the reassignment
> tool
> > > may
> > > > hung up to 10 seconds (by default) to retry ChangeReplicaDirRequest
> if
> > > any
> > > > replica has not been created already. User can change this timeout
> > value
> > > > using the newly-added --timeout argument of the reassignment tool.
> This
> > > is
> > > > specified in the Public Interface section 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-30 Thread Jun Rao
Hi, Dong,

Ok, so it seems that in solution (2), if the tool exits successfully, then
we know for sure that all replicas will be in the right log dirs. Solution
(1) doesn't guarantee that. That seems better and we can go with your
current solution then.

Thanks,

Jun

On Fri, Mar 24, 2017 at 4:28 PM, Dong Lin  wrote:

> Hey Jun,
>
> No.. the current approach describe in the KIP (see here
>  3A+Support+replicas+movement+between+log+directories#KIP-
> 113:Supportreplicasmovementbetweenlogdirectories-2)Howtoreas
> signreplicabetweenlogdirectoriesacrossbrokers>)
> also sends ChangeReplicaDirRequest before writing reassignment path in ZK.
> I think we discussing whether ChangeReplicaDirResponse (1) shows success or
> (2) should specify ReplicaNotAvailableException, if replica has not been
> created yet.
>
> Since both solution will send ChangeReplicaDirRequest before writing
> reassignment in ZK, their chance of creating replica in the right directory
> is the same.
>
> To take care of the rarer case that some brokers go down immediately after
> the reassignment tool is run, solution (1) requires reassignment tool to
> repeatedly send DescribeDirRequest and ChangeReplicaDirRequest, while
> solution (1) requires tool to only retry ChangeReplicaDirRequest if the
> response says ReplicaNotAvailableException. It seems that solution (2) is
> cleaner because ChangeReplicaDirRequest won't depend on DescribeDirRequest.
> What do you think?
>
> Thanks,
> Dong
>
>
> On Fri, Mar 24, 2017 at 3:56 PM, Jun Rao  wrote:
>
> > Hi, Dong,
> >
> > We are just comparing whether it's better for the reassignment tool to
> > send ChangeReplicaDirRequest
> > (1) before or (2) after writing the reassignment path in ZK .
> >
> > In the case when all brokers are alive when the reassignment tool is run,
> > (1) guarantees 100% that the new replicas will be in the right log dirs
> and
> > (2) can't.
> >
> > In the rarer case that some brokers go down immediately after the
> > reassignment tool is run, in either approach, there is a chance when the
> > failed broker comes back, it will complete the pending reassignment
> process
> > by putting some replicas in the wrong log dirs.
> >
> > Implementation wise, (1) and (2) seem to be the same. So, it seems to me
> > that (1) is better?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Thu, Mar 23, 2017 at 11:54 PM, Dong Lin  wrote:
> >
> > > Hey Jun,
> > >
> > > Thanks much for the response! I agree with you that if multiple
> replicas
> > > are created in the wrong directory, we may waste resource if either
> > > replicaMoveThread number is low or intra.broker.throttled.rate is slow.
> > > Then the question is whether the suggested approach increases the
> chance
> > of
> > > replica being created in the correct log directory.
> > >
> > > I think the answer is no due to the argument provided in the previous
> > > email. Sending ChangeReplicaDirRequest before updating znode has
> > negligible
> > > impact on the chance that the broker processes ChangeReplicaDirRequest
> > > before LeaderAndIsrRequest from controller. If we still worry about the
> > > order they are sent, the reassignment tool can first send
> > > ChangeReplicaDirRequest (so that broker remembers it in memory), create
> > > reassignment znode, and then retry ChangeReplicaDirRequset if the
> > previous
> > > ChangeReplicaDirResponse says the replica has not been created. This
> > should
> > > give us the highest possible chance of creating replica in the correct
> > > directory and avoid the problem of the suggested approach. I have
> updated
> > > "How
> > > to reassign replica between log directories across brokers" in the KIP
> to
> > > explain this procedure.
> > >
> > > To answer your question, the reassignment tool should fail with with
> > proper
> > > error message if user has specified log directory for a replica on an
> > > offline broker.  This is reasonable because reassignment tool can not
> > > guarantee that the replica will be moved to the specified log directory
> > if
> > > the broker is offline. If all brokers are online, the reassignment tool
> > may
> > > hung up to 10 seconds (by default) to retry ChangeReplicaDirRequest if
> > any
> > > replica has not been created already. User can change this timeout
> value
> > > using the newly-added --timeout argument of the reassignment tool. This
> > is
> > > specified in the Public Interface section in the KIP. The reassignment
> > tool
> > > will only block if user uses this new feature of reassigning replica
> to a
> > > specific log directory in the broker. Therefore it seems backward
> > > compatible.
> > >
> > > Does this address the concern?
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Thu, Mar 23, 2017 at 10:06 PM, Jun Rao  wrote:
> > >
> > > > Hi, Dong,
> > > >
> > > > 11.2 I think there are a few reasons why the cross disk 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-24 Thread Dong Lin
Hey Jun,

No.. the current approach describe in the KIP (see here
)
also sends ChangeReplicaDirRequest before writing reassignment path in ZK.
I think we discussing whether ChangeReplicaDirResponse (1) shows success or
(2) should specify ReplicaNotAvailableException, if replica has not been
created yet.

Since both solution will send ChangeReplicaDirRequest before writing
reassignment in ZK, their chance of creating replica in the right directory
is the same.

To take care of the rarer case that some brokers go down immediately after
the reassignment tool is run, solution (1) requires reassignment tool to
repeatedly send DescribeDirRequest and ChangeReplicaDirRequest, while
solution (1) requires tool to only retry ChangeReplicaDirRequest if the
response says ReplicaNotAvailableException. It seems that solution (2) is
cleaner because ChangeReplicaDirRequest won't depend on DescribeDirRequest.
What do you think?

Thanks,
Dong


On Fri, Mar 24, 2017 at 3:56 PM, Jun Rao  wrote:

> Hi, Dong,
>
> We are just comparing whether it's better for the reassignment tool to
> send ChangeReplicaDirRequest
> (1) before or (2) after writing the reassignment path in ZK .
>
> In the case when all brokers are alive when the reassignment tool is run,
> (1) guarantees 100% that the new replicas will be in the right log dirs and
> (2) can't.
>
> In the rarer case that some brokers go down immediately after the
> reassignment tool is run, in either approach, there is a chance when the
> failed broker comes back, it will complete the pending reassignment process
> by putting some replicas in the wrong log dirs.
>
> Implementation wise, (1) and (2) seem to be the same. So, it seems to me
> that (1) is better?
>
> Thanks,
>
> Jun
>
>
> On Thu, Mar 23, 2017 at 11:54 PM, Dong Lin  wrote:
>
> > Hey Jun,
> >
> > Thanks much for the response! I agree with you that if multiple replicas
> > are created in the wrong directory, we may waste resource if either
> > replicaMoveThread number is low or intra.broker.throttled.rate is slow.
> > Then the question is whether the suggested approach increases the chance
> of
> > replica being created in the correct log directory.
> >
> > I think the answer is no due to the argument provided in the previous
> > email. Sending ChangeReplicaDirRequest before updating znode has
> negligible
> > impact on the chance that the broker processes ChangeReplicaDirRequest
> > before LeaderAndIsrRequest from controller. If we still worry about the
> > order they are sent, the reassignment tool can first send
> > ChangeReplicaDirRequest (so that broker remembers it in memory), create
> > reassignment znode, and then retry ChangeReplicaDirRequset if the
> previous
> > ChangeReplicaDirResponse says the replica has not been created. This
> should
> > give us the highest possible chance of creating replica in the correct
> > directory and avoid the problem of the suggested approach. I have updated
> > "How
> > to reassign replica between log directories across brokers" in the KIP to
> > explain this procedure.
> >
> > To answer your question, the reassignment tool should fail with with
> proper
> > error message if user has specified log directory for a replica on an
> > offline broker.  This is reasonable because reassignment tool can not
> > guarantee that the replica will be moved to the specified log directory
> if
> > the broker is offline. If all brokers are online, the reassignment tool
> may
> > hung up to 10 seconds (by default) to retry ChangeReplicaDirRequest if
> any
> > replica has not been created already. User can change this timeout value
> > using the newly-added --timeout argument of the reassignment tool. This
> is
> > specified in the Public Interface section in the KIP. The reassignment
> tool
> > will only block if user uses this new feature of reassigning replica to a
> > specific log directory in the broker. Therefore it seems backward
> > compatible.
> >
> > Does this address the concern?
> >
> > Thanks,
> > Dong
> >
> > On Thu, Mar 23, 2017 at 10:06 PM, Jun Rao  wrote:
> >
> > > Hi, Dong,
> > >
> > > 11.2 I think there are a few reasons why the cross disk movement may
> not
> > > catch up if the replicas are created in the wrong log dirs to start
> with.
> > > (a) There could be more replica fetcher threads than the disk movement
> > > threads. (b) intra.broker.throttled.rate may be configured lower than
> the
> > > replica throttle rate. That's why I think getting the replicas created
> in
> > > the right log dirs will be better.
> > >
> > > For the corner case issue that you mentioned, I am not sure if the
> > approach
> > > in the KIP completely avoids that. If a broker is down when the
> partition
> > > 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-24 Thread Jun Rao
Hi, Dong,

We are just comparing whether it's better for the reassignment tool to
send ChangeReplicaDirRequest
(1) before or (2) after writing the reassignment path in ZK .

In the case when all brokers are alive when the reassignment tool is run,
(1) guarantees 100% that the new replicas will be in the right log dirs and
(2) can't.

In the rarer case that some brokers go down immediately after the
reassignment tool is run, in either approach, there is a chance when the
failed broker comes back, it will complete the pending reassignment process
by putting some replicas in the wrong log dirs.

Implementation wise, (1) and (2) seem to be the same. So, it seems to me
that (1) is better?

Thanks,

Jun


On Thu, Mar 23, 2017 at 11:54 PM, Dong Lin  wrote:

> Hey Jun,
>
> Thanks much for the response! I agree with you that if multiple replicas
> are created in the wrong directory, we may waste resource if either
> replicaMoveThread number is low or intra.broker.throttled.rate is slow.
> Then the question is whether the suggested approach increases the chance of
> replica being created in the correct log directory.
>
> I think the answer is no due to the argument provided in the previous
> email. Sending ChangeReplicaDirRequest before updating znode has negligible
> impact on the chance that the broker processes ChangeReplicaDirRequest
> before LeaderAndIsrRequest from controller. If we still worry about the
> order they are sent, the reassignment tool can first send
> ChangeReplicaDirRequest (so that broker remembers it in memory), create
> reassignment znode, and then retry ChangeReplicaDirRequset if the previous
> ChangeReplicaDirResponse says the replica has not been created. This should
> give us the highest possible chance of creating replica in the correct
> directory and avoid the problem of the suggested approach. I have updated
> "How
> to reassign replica between log directories across brokers" in the KIP to
> explain this procedure.
>
> To answer your question, the reassignment tool should fail with with proper
> error message if user has specified log directory for a replica on an
> offline broker.  This is reasonable because reassignment tool can not
> guarantee that the replica will be moved to the specified log directory if
> the broker is offline. If all brokers are online, the reassignment tool may
> hung up to 10 seconds (by default) to retry ChangeReplicaDirRequest if any
> replica has not been created already. User can change this timeout value
> using the newly-added --timeout argument of the reassignment tool. This is
> specified in the Public Interface section in the KIP. The reassignment tool
> will only block if user uses this new feature of reassigning replica to a
> specific log directory in the broker. Therefore it seems backward
> compatible.
>
> Does this address the concern?
>
> Thanks,
> Dong
>
> On Thu, Mar 23, 2017 at 10:06 PM, Jun Rao  wrote:
>
> > Hi, Dong,
> >
> > 11.2 I think there are a few reasons why the cross disk movement may not
> > catch up if the replicas are created in the wrong log dirs to start with.
> > (a) There could be more replica fetcher threads than the disk movement
> > threads. (b) intra.broker.throttled.rate may be configured lower than the
> > replica throttle rate. That's why I think getting the replicas created in
> > the right log dirs will be better.
> >
> > For the corner case issue that you mentioned, I am not sure if the
> approach
> > in the KIP completely avoids that. If a broker is down when the partition
> > reassignment tool is started, does the tool just hang (keep retrying
> > ChangeReplicaDirRequest) until the broker comes back? Currently, the
> > partition reassignment tool doesn't block.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Mar 21, 2017 at 11:24 AM, Dong Lin  wrote:
> >
> > > Hey Jun,
> > >
> > > Thanks for the explanation. Please see below my thoughts.
> > >
> > > 10. I see. So you are concerned with the potential implementation
> > > complexity which I wasn't aware of. I think it is OK not to do log
> > > cleaning on the .move log since there can be only one such log in each
> > > directory. I have updated the KIP to specify this:
> > >
> > > "The log segments in topicPartition.move directory will be subject to
> log
> > > truncation, log retention in the same way as the log segments in the
> > source
> > > log directory. But we may not do log cleaning on the
> topicPartition.move
> > to
> > > simplify the implementation."
> > >
> > > 11.2 Now I get your point. I think we have slightly different
> expectation
> > > of the order in which the reassignment tools updates reassignment node
> in
> > > ZK and sends ChangeReplicaDirRequest.
> > >
> > > I think the reassignment tool should first create reassignment znode
> and
> > > then keep sending ChangeReplicaDirRequest until success. I think
> sending
> > > ChangeReplicaDirRequest before updating znode has negligible 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-24 Thread Dong Lin
Hey Jun,

Thanks much for the response! I agree with you that if multiple replicas
are created in the wrong directory, we may waste resource if either
replicaMoveThread number is low or intra.broker.throttled.rate is slow.
Then the question is whether the suggested approach increases the chance of
replica being created in the correct log directory.

I think the answer is no due to the argument provided in the previous
email. Sending ChangeReplicaDirRequest before updating znode has negligible
impact on the chance that the broker processes ChangeReplicaDirRequest
before LeaderAndIsrRequest from controller. If we still worry about the
order they are sent, the reassignment tool can first send
ChangeReplicaDirRequest (so that broker remembers it in memory), create
reassignment znode, and then retry ChangeReplicaDirRequset if the previous
ChangeReplicaDirResponse says the replica has not been created. This should
give us the highest possible chance of creating replica in the correct
directory and avoid the problem of the suggested approach. I have updated "How
to reassign replica between log directories across brokers" in the KIP to
explain this procedure.

To answer your question, the reassignment tool should fail with with proper
error message if user has specified log directory for a replica on an
offline broker.  This is reasonable because reassignment tool can not
guarantee that the replica will be moved to the specified log directory if
the broker is offline. If all brokers are online, the reassignment tool may
hung up to 10 seconds (by default) to retry ChangeReplicaDirRequest if any
replica has not been created already. User can change this timeout value
using the newly-added --timeout argument of the reassignment tool. This is
specified in the Public Interface section in the KIP. The reassignment tool
will only block if user uses this new feature of reassigning replica to a
specific log directory in the broker. Therefore it seems backward
compatible.

Does this address the concern?

Thanks,
Dong

On Thu, Mar 23, 2017 at 10:06 PM, Jun Rao  wrote:

> Hi, Dong,
>
> 11.2 I think there are a few reasons why the cross disk movement may not
> catch up if the replicas are created in the wrong log dirs to start with.
> (a) There could be more replica fetcher threads than the disk movement
> threads. (b) intra.broker.throttled.rate may be configured lower than the
> replica throttle rate. That's why I think getting the replicas created in
> the right log dirs will be better.
>
> For the corner case issue that you mentioned, I am not sure if the approach
> in the KIP completely avoids that. If a broker is down when the partition
> reassignment tool is started, does the tool just hang (keep retrying
> ChangeReplicaDirRequest) until the broker comes back? Currently, the
> partition reassignment tool doesn't block.
>
> Thanks,
>
> Jun
>
>
> On Tue, Mar 21, 2017 at 11:24 AM, Dong Lin  wrote:
>
> > Hey Jun,
> >
> > Thanks for the explanation. Please see below my thoughts.
> >
> > 10. I see. So you are concerned with the potential implementation
> > complexity which I wasn't aware of. I think it is OK not to do log
> > cleaning on the .move log since there can be only one such log in each
> > directory. I have updated the KIP to specify this:
> >
> > "The log segments in topicPartition.move directory will be subject to log
> > truncation, log retention in the same way as the log segments in the
> source
> > log directory. But we may not do log cleaning on the topicPartition.move
> to
> > simplify the implementation."
> >
> > 11.2 Now I get your point. I think we have slightly different expectation
> > of the order in which the reassignment tools updates reassignment node in
> > ZK and sends ChangeReplicaDirRequest.
> >
> > I think the reassignment tool should first create reassignment znode and
> > then keep sending ChangeReplicaDirRequest until success. I think sending
> > ChangeReplicaDirRequest before updating znode has negligible impact on
> the
> > chance that the broker processes ChangeReplicaDirRequest before
> > LeaderAndIsrRequest from controller, because the time for controller to
> > receive ZK notification, handle state machine changes and send
> > LeaderAndIsrRequests should be much longer than the time for reassignment
> > tool to setup connection with broker and send ChangeReplicaDirRequest.
> Even
> > if broker receives LeaderAndIsrRequest a bit sooner, the data in the
> > original replica should be smaller enough for .move log to catch up very
> > quickly, so that broker can swap the log soon after it receives
> > ChangeReplicaDirRequest -- otherwise the intra.broker.throttled.rate is
> > probably too small. Does this address your concern with the performance?
> >
> > One concern with the suggested approach is that the
> ChangeReplicaDirRequest
> > may be lost if broker crashes before it creates the replica. I agree it
> is
> > rare. But it will be confusing 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-23 Thread Jun Rao
Hi, Dong,

11.2 I think there are a few reasons why the cross disk movement may not
catch up if the replicas are created in the wrong log dirs to start with.
(a) There could be more replica fetcher threads than the disk movement
threads. (b) intra.broker.throttled.rate may be configured lower than the
replica throttle rate. That's why I think getting the replicas created in
the right log dirs will be better.

For the corner case issue that you mentioned, I am not sure if the approach
in the KIP completely avoids that. If a broker is down when the partition
reassignment tool is started, does the tool just hang (keep retrying
ChangeReplicaDirRequest) until the broker comes back? Currently, the
partition reassignment tool doesn't block.

Thanks,

Jun


On Tue, Mar 21, 2017 at 11:24 AM, Dong Lin  wrote:

> Hey Jun,
>
> Thanks for the explanation. Please see below my thoughts.
>
> 10. I see. So you are concerned with the potential implementation
> complexity which I wasn't aware of. I think it is OK not to do log
> cleaning on the .move log since there can be only one such log in each
> directory. I have updated the KIP to specify this:
>
> "The log segments in topicPartition.move directory will be subject to log
> truncation, log retention in the same way as the log segments in the source
> log directory. But we may not do log cleaning on the topicPartition.move to
> simplify the implementation."
>
> 11.2 Now I get your point. I think we have slightly different expectation
> of the order in which the reassignment tools updates reassignment node in
> ZK and sends ChangeReplicaDirRequest.
>
> I think the reassignment tool should first create reassignment znode and
> then keep sending ChangeReplicaDirRequest until success. I think sending
> ChangeReplicaDirRequest before updating znode has negligible impact on the
> chance that the broker processes ChangeReplicaDirRequest before
> LeaderAndIsrRequest from controller, because the time for controller to
> receive ZK notification, handle state machine changes and send
> LeaderAndIsrRequests should be much longer than the time for reassignment
> tool to setup connection with broker and send ChangeReplicaDirRequest. Even
> if broker receives LeaderAndIsrRequest a bit sooner, the data in the
> original replica should be smaller enough for .move log to catch up very
> quickly, so that broker can swap the log soon after it receives
> ChangeReplicaDirRequest -- otherwise the intra.broker.throttled.rate is
> probably too small. Does this address your concern with the performance?
>
> One concern with the suggested approach is that the ChangeReplicaDirRequest
> may be lost if broker crashes before it creates the replica. I agree it is
> rare. But it will be confusing when it happens. Operators would have to
> keep verifying reassignment and possibly retry execution until success if
> they want to make sure that the ChangeReplicaDirRequest is executed.
>
> Thanks,
> Dong
>
>
>
> On Tue, Mar 21, 2017 at 8:37 AM, Jun Rao  wrote:
>
> > Hi, Dong,
> >
> > 10. I was mainly concerned about the additional complexity needed to
> > support log cleaning in the .move log. For example, LogToClean is keyed
> off
> > TopicPartition. To be able to support cleaning different instances of the
> > same partition, we need additional logic. I am not how much additional
> > complexity is needed and whether it's worth it. If we don't do log
> cleaning
> > at all on the .move log, then we don't have to change the log cleaner's
> > code.
> >
> > 11.2 I was thinking of the following flow. In the execute phase, the
> > reassignment tool first issues a ChangeReplicaDirRequest to brokers where
> > new replicas will be created. The brokers remember the mapping and
> return a
> > successful code. The reassignment tool then initiates the cross broker
> > movement through the controller. In the verify phase, in addition to
> > checking the replica assignment at the brokers, it issues
> > DescribeDirsRequest to check the replica to log dirs mapping. For each
> > partition in the response, the broker returns a state to indicate whether
> > the replica is final, temporary or pending. If all replicas are in the
> > final state, the tool checks if all replicas are in the expected log
> dirs.
> > If they are not, output a warning (and perhaps suggest the users to move
> > the data again). However, this should be rare.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Mon, Mar 20, 2017 at 10:46 AM, Dong Lin  wrote:
> >
> > > Hey Jun,
> > >
> > > Thanks for the response! It seems that we have only two remaining
> issues.
> > > Please see my reply below.
> > >
> > > On Mon, Mar 20, 2017 at 7:45 AM, Jun Rao  wrote:
> > >
> > > > Hi, Dong,
> > > >
> > > > Thanks for the update. A few replies inlined below.
> > > >
> > > > On Thu, Mar 16, 2017 at 12:28 AM, Dong Lin 
> > wrote:
> > > >
> > > > > Hey Jun,
> > > > >
> > > > > 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-21 Thread Dong Lin
Hey Jun,

Thanks for the explanation. Please see below my thoughts.

10. I see. So you are concerned with the potential implementation
complexity which I wasn't aware of. I think it is OK not to do log
cleaning on the .move log since there can be only one such log in each
directory. I have updated the KIP to specify this:

"The log segments in topicPartition.move directory will be subject to log
truncation, log retention in the same way as the log segments in the source
log directory. But we may not do log cleaning on the topicPartition.move to
simplify the implementation."

11.2 Now I get your point. I think we have slightly different expectation
of the order in which the reassignment tools updates reassignment node in
ZK and sends ChangeReplicaDirRequest.

I think the reassignment tool should first create reassignment znode and
then keep sending ChangeReplicaDirRequest until success. I think sending
ChangeReplicaDirRequest before updating znode has negligible impact on the
chance that the broker processes ChangeReplicaDirRequest before
LeaderAndIsrRequest from controller, because the time for controller to
receive ZK notification, handle state machine changes and send
LeaderAndIsrRequests should be much longer than the time for reassignment
tool to setup connection with broker and send ChangeReplicaDirRequest. Even
if broker receives LeaderAndIsrRequest a bit sooner, the data in the
original replica should be smaller enough for .move log to catch up very
quickly, so that broker can swap the log soon after it receives
ChangeReplicaDirRequest -- otherwise the intra.broker.throttled.rate is
probably too small. Does this address your concern with the performance?

One concern with the suggested approach is that the ChangeReplicaDirRequest
may be lost if broker crashes before it creates the replica. I agree it is
rare. But it will be confusing when it happens. Operators would have to
keep verifying reassignment and possibly retry execution until success if
they want to make sure that the ChangeReplicaDirRequest is executed.

Thanks,
Dong



On Tue, Mar 21, 2017 at 8:37 AM, Jun Rao  wrote:

> Hi, Dong,
>
> 10. I was mainly concerned about the additional complexity needed to
> support log cleaning in the .move log. For example, LogToClean is keyed off
> TopicPartition. To be able to support cleaning different instances of the
> same partition, we need additional logic. I am not how much additional
> complexity is needed and whether it's worth it. If we don't do log cleaning
> at all on the .move log, then we don't have to change the log cleaner's
> code.
>
> 11.2 I was thinking of the following flow. In the execute phase, the
> reassignment tool first issues a ChangeReplicaDirRequest to brokers where
> new replicas will be created. The brokers remember the mapping and return a
> successful code. The reassignment tool then initiates the cross broker
> movement through the controller. In the verify phase, in addition to
> checking the replica assignment at the brokers, it issues
> DescribeDirsRequest to check the replica to log dirs mapping. For each
> partition in the response, the broker returns a state to indicate whether
> the replica is final, temporary or pending. If all replicas are in the
> final state, the tool checks if all replicas are in the expected log dirs.
> If they are not, output a warning (and perhaps suggest the users to move
> the data again). However, this should be rare.
>
> Thanks,
>
> Jun
>
>
> On Mon, Mar 20, 2017 at 10:46 AM, Dong Lin  wrote:
>
> > Hey Jun,
> >
> > Thanks for the response! It seems that we have only two remaining issues.
> > Please see my reply below.
> >
> > On Mon, Mar 20, 2017 at 7:45 AM, Jun Rao  wrote:
> >
> > > Hi, Dong,
> > >
> > > Thanks for the update. A few replies inlined below.
> > >
> > > On Thu, Mar 16, 2017 at 12:28 AM, Dong Lin 
> wrote:
> > >
> > > > Hey Jun,
> > > >
> > > > Thanks for your comment! Please see my reply below.
> > > >
> > > > On Wed, Mar 15, 2017 at 9:45 PM, Jun Rao  wrote:
> > > >
> > > > > Hi, Dong,
> > > > >
> > > > > Thanks for the reply.
> > > > >
> > > > > 10. Could you comment on that?
> > > > >
> > > >
> > > > Sorry, I missed that comment.
> > > >
> > > > Good point. I think the log segments in topicPartition.move directory
> > > will
> > > > be subject to log truncation, log retention and log cleaning in the
> > same
> > > > way as the log segments in the source log directory. I just specified
> > > this
> > > > inthe KIP.
> > > >
> > > >
> > > This is ok, but doubles the overhead of log cleaning. We probably want
> to
> > > think a bit more on this.
> > >
> >
> > I think this is OK because the number of replicas that are being moved is
> > limited by the number of ReplicaMoveThread. The default number of
> > ReplicaMoveThread is the number of log directories, which mean we incur
> > these overhead for at most one replica per log 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-21 Thread Jun Rao
Hi, Dong,

10. I was mainly concerned about the additional complexity needed to
support log cleaning in the .move log. For example, LogToClean is keyed off
TopicPartition. To be able to support cleaning different instances of the
same partition, we need additional logic. I am not how much additional
complexity is needed and whether it's worth it. If we don't do log cleaning
at all on the .move log, then we don't have to change the log cleaner's
code.

11.2 I was thinking of the following flow. In the execute phase, the
reassignment tool first issues a ChangeReplicaDirRequest to brokers where
new replicas will be created. The brokers remember the mapping and return a
successful code. The reassignment tool then initiates the cross broker
movement through the controller. In the verify phase, in addition to
checking the replica assignment at the brokers, it issues
DescribeDirsRequest to check the replica to log dirs mapping. For each
partition in the response, the broker returns a state to indicate whether
the replica is final, temporary or pending. If all replicas are in the
final state, the tool checks if all replicas are in the expected log dirs.
If they are not, output a warning (and perhaps suggest the users to move
the data again). However, this should be rare.

Thanks,

Jun


On Mon, Mar 20, 2017 at 10:46 AM, Dong Lin  wrote:

> Hey Jun,
>
> Thanks for the response! It seems that we have only two remaining issues.
> Please see my reply below.
>
> On Mon, Mar 20, 2017 at 7:45 AM, Jun Rao  wrote:
>
> > Hi, Dong,
> >
> > Thanks for the update. A few replies inlined below.
> >
> > On Thu, Mar 16, 2017 at 12:28 AM, Dong Lin  wrote:
> >
> > > Hey Jun,
> > >
> > > Thanks for your comment! Please see my reply below.
> > >
> > > On Wed, Mar 15, 2017 at 9:45 PM, Jun Rao  wrote:
> > >
> > > > Hi, Dong,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > 10. Could you comment on that?
> > > >
> > >
> > > Sorry, I missed that comment.
> > >
> > > Good point. I think the log segments in topicPartition.move directory
> > will
> > > be subject to log truncation, log retention and log cleaning in the
> same
> > > way as the log segments in the source log directory. I just specified
> > this
> > > inthe KIP.
> > >
> > >
> > This is ok, but doubles the overhead of log cleaning. We probably want to
> > think a bit more on this.
> >
>
> I think this is OK because the number of replicas that are being moved is
> limited by the number of ReplicaMoveThread. The default number of
> ReplicaMoveThread is the number of log directories, which mean we incur
> these overhead for at most one replica per log directory at any time.
> Suppose there are most than 100 replica in any log directory, the increase
> in overhead is less than 1%.
>
> Another way to look at this is that this is no worse than replica
> reassignment. When we reassign replica from one broker to another, we will
> double the overhread of log cleaning in the cluster for this replica. If we
> are OK with this then we are OK with replica movement between log
> directories.
>
>
> >
> >
> > >
> > > >
> > > > 11.2 "I am concerned that the ChangeReplicaDirRequest would be lost
> if
> > > > broker
> > > > restarts after it sends ChangeReplicaDirResponse but before it
> receives
> > > > LeaderAndIsrRequest."
> > > >
> > > > In that case, the reassignment tool could detect that through
> > > > DescribeDirsRequest
> > > > and issue ChangeReplicaDirRequest again, right? In the common case,
> > this
> > > is
> > > > probably not needed and we only need to write each replica once.
> > > >
> > > > My main concern with the approach in the current KIP is that once a
> new
> > > > replica is created in the wrong log dir, the cross log directory
> > movement
> > > > may not catch up until the new replica is fully bootstrapped. So, we
> > end
> > > up
> > > > writing the data for the same replica twice.
> > > >
> > >
> > > I agree with your concern. My main concern is that it is a bit weird if
> > > ChangeReplicaDirResponse can not guarantee success and the tool needs
> to
> > > rely on DescribeDirResponse to see if it needs to send
> > > ChangeReplicaDirRequest again.
> > >
> > > How about this: If broker doesn't not have already replica created for
> > the
> > > specified topicParition when it receives ChangeReplicaDirRequest, it
> will
> > > reply ReplicaNotAvailableException AND remember (replica, destination
> log
> > > directory) pair in memory to create the replica in the specified log
> > > directory.
> > >
> > >
> > I am not sure if returning ReplicaNotAvailableException is useful? What
> > will the client do on receiving ReplicaNotAvailableException in this
> case?
> >
> > Perhaps we could just replace the is_temporary field in
> > DescribeDirsRresponsePartition with a state field. We can use 0 to
> indicate
> > the partition is created, 1 to indicate the partition is temporary and 2
> to
> > 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-20 Thread Dong Lin
Hey Jun,

Thanks for the response! It seems that we have only two remaining issues.
Please see my reply below.

On Mon, Mar 20, 2017 at 7:45 AM, Jun Rao  wrote:

> Hi, Dong,
>
> Thanks for the update. A few replies inlined below.
>
> On Thu, Mar 16, 2017 at 12:28 AM, Dong Lin  wrote:
>
> > Hey Jun,
> >
> > Thanks for your comment! Please see my reply below.
> >
> > On Wed, Mar 15, 2017 at 9:45 PM, Jun Rao  wrote:
> >
> > > Hi, Dong,
> > >
> > > Thanks for the reply.
> > >
> > > 10. Could you comment on that?
> > >
> >
> > Sorry, I missed that comment.
> >
> > Good point. I think the log segments in topicPartition.move directory
> will
> > be subject to log truncation, log retention and log cleaning in the same
> > way as the log segments in the source log directory. I just specified
> this
> > inthe KIP.
> >
> >
> This is ok, but doubles the overhead of log cleaning. We probably want to
> think a bit more on this.
>

I think this is OK because the number of replicas that are being moved is
limited by the number of ReplicaMoveThread. The default number of
ReplicaMoveThread is the number of log directories, which mean we incur
these overhead for at most one replica per log directory at any time.
Suppose there are most than 100 replica in any log directory, the increase
in overhead is less than 1%.

Another way to look at this is that this is no worse than replica
reassignment. When we reassign replica from one broker to another, we will
double the overhread of log cleaning in the cluster for this replica. If we
are OK with this then we are OK with replica movement between log
directories.


>
>
> >
> > >
> > > 11.2 "I am concerned that the ChangeReplicaDirRequest would be lost if
> > > broker
> > > restarts after it sends ChangeReplicaDirResponse but before it receives
> > > LeaderAndIsrRequest."
> > >
> > > In that case, the reassignment tool could detect that through
> > > DescribeDirsRequest
> > > and issue ChangeReplicaDirRequest again, right? In the common case,
> this
> > is
> > > probably not needed and we only need to write each replica once.
> > >
> > > My main concern with the approach in the current KIP is that once a new
> > > replica is created in the wrong log dir, the cross log directory
> movement
> > > may not catch up until the new replica is fully bootstrapped. So, we
> end
> > up
> > > writing the data for the same replica twice.
> > >
> >
> > I agree with your concern. My main concern is that it is a bit weird if
> > ChangeReplicaDirResponse can not guarantee success and the tool needs to
> > rely on DescribeDirResponse to see if it needs to send
> > ChangeReplicaDirRequest again.
> >
> > How about this: If broker doesn't not have already replica created for
> the
> > specified topicParition when it receives ChangeReplicaDirRequest, it will
> > reply ReplicaNotAvailableException AND remember (replica, destination log
> > directory) pair in memory to create the replica in the specified log
> > directory.
> >
> >
> I am not sure if returning ReplicaNotAvailableException is useful? What
> will the client do on receiving ReplicaNotAvailableException in this case?
>
> Perhaps we could just replace the is_temporary field in
> DescribeDirsRresponsePartition with a state field. We can use 0 to indicate
> the partition is created, 1 to indicate the partition is temporary and 2 to
> indicate that the partition is pending.
>

ReplicaNotAvailableException is useful because the client can re-send
ChangeReplicaDirRequest (with backoff) after receiving
ReplicaNotAvailableException in the response. ChangeReplicaDirRequest will
only succeed after replica has been created for the specified partition in
the broker.

I think this is cleaner than asking reassignment tool to detect that
through DescribeDirsRequest and issue ChangeReplicaDirRequest again. Both
solution has the same chance of writing the data for the same replica
twice. In the original solution, the reassignment tool will keep retrying
ChangeReplicaDirRequest until success. In the second suggested solution,
the reassignment tool needs to send ChangeReplicaDirRequest, send
DescribeDirsRequest to verify result, and retry ChangeReplicaDirRequest and
DescribeDirsRequest again if the replica hasn't been created already. Thus
the second solution couples ChangeReplicaDirRequest with
DescribeDirsRequest and makes tool's logic is bit more complicated.

Besides, I am not sure I understand your suggestion for is_temporary field.
It seems that a replica can have only two states, i.e. normal it is being
used to serve fetch/produce requests and temporary if it is a replica is
that catching up with the normal one. If you think we should have
reassignment tool send DescribeDirsRequest before retrying
ChangeReplicaDirRequest, can you elaborate a bit what is the "pending"
state?


>
>
> > >
> > > 11.3 Are you saying the value in --throttle will be used to set both
> > > intra.broker.throttled.rate and 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-20 Thread Jun Rao
Hi, Dong,

Thanks for the update. A few replies inlined below.

On Thu, Mar 16, 2017 at 12:28 AM, Dong Lin  wrote:

> Hey Jun,
>
> Thanks for your comment! Please see my reply below.
>
> On Wed, Mar 15, 2017 at 9:45 PM, Jun Rao  wrote:
>
> > Hi, Dong,
> >
> > Thanks for the reply.
> >
> > 10. Could you comment on that?
> >
>
> Sorry, I missed that comment.
>
> Good point. I think the log segments in topicPartition.move directory will
> be subject to log truncation, log retention and log cleaning in the same
> way as the log segments in the source log directory. I just specified this
> inthe KIP.
>
>
This is ok, but doubles the overhead of log cleaning. We probably want to
think a bit more on this.


>
> >
> > 11.2 "I am concerned that the ChangeReplicaDirRequest would be lost if
> > broker
> > restarts after it sends ChangeReplicaDirResponse but before it receives
> > LeaderAndIsrRequest."
> >
> > In that case, the reassignment tool could detect that through
> > DescribeDirsRequest
> > and issue ChangeReplicaDirRequest again, right? In the common case, this
> is
> > probably not needed and we only need to write each replica once.
> >
> > My main concern with the approach in the current KIP is that once a new
> > replica is created in the wrong log dir, the cross log directory movement
> > may not catch up until the new replica is fully bootstrapped. So, we end
> up
> > writing the data for the same replica twice.
> >
>
> I agree with your concern. My main concern is that it is a bit weird if
> ChangeReplicaDirResponse can not guarantee success and the tool needs to
> rely on DescribeDirResponse to see if it needs to send
> ChangeReplicaDirRequest again.
>
> How about this: If broker doesn't not have already replica created for the
> specified topicParition when it receives ChangeReplicaDirRequest, it will
> reply ReplicaNotAvailableException AND remember (replica, destination log
> directory) pair in memory to create the replica in the specified log
> directory.
>
>
I am not sure if returning ReplicaNotAvailableException is useful? What
will the client do on receiving ReplicaNotAvailableException in this case?

Perhaps we could just replace the is_temporary field in
DescribeDirsRresponsePartition with a state field. We can use 0 to indicate
the partition is created, 1 to indicate the partition is temporary and 2 to
indicate that the partition is pending.


> >
> > 11.3 Are you saying the value in --throttle will be used to set both
> > intra.broker.throttled.rate and leader.follower.replication.
> > throttled.replicas?
> >
>
> No. --throttle will be used to only to set leader.follower.replication as
> it does now. I think we do not need any option in the
> kafka-reassignment-partitions.sh to specify intra.broker.throttled.rate.
> User canset it in broker config or dynamically using kafka-config.sh. Does
> this sound OK?
>
>
Ok. This sounds good. It would be useful to make this clear in the wiki.


>
> >
> > 12.2 If the user only wants to check one topic, the tool could do the
> > filtering on the client side, right? My concern with having both log_dirs
> > and topics is the semantic. For example, if both are not empty, do we
> > return the intersection or the union?
> >
>
> Yes the tool could filter on the client side. But the purpose of having
> this field is to reduce response side in case broker has a lot of topics.
> The both fields are used as filter and the result is intersection. Do you
> think this semantic is confusing or counter-intuitive?


>

Ok. Could we document the semantic when both dirs and topics are specified?

Thanks,

Jun

>
> >
> > On Mon, Mar 13, 2017 at 3:32 PM, Dong Lin  wrote:
> >
> > > Hey Jun,
> > >
> > > Thanks much for your detailed comments. Please see my reply below.
> > >
> > > On Mon, Mar 13, 2017 at 9:09 AM, Jun Rao  wrote:
> > >
> > > > Hi, Dong,
> > > >
> > > > Thanks for the updated KIP. Some more comments below.
> > > >
> > > > 10. For the .move log, do we perform any segment deletion (based on
> > > > retention) or log cleaning (if a compacted topic)? Or do we only
> enable
> > > > that after the swap?
> > > >
> > > > 11. kafka-reassign-partitions.sh
> > > > 11.1 If all reassigned replicas are in the current broker and only
> the
> > > log
> > > > directories have changed, we can probably optimize the tool to not
> > > trigger
> > > > partition reassignment through the controller and only
> > > > send ChangeReplicaDirRequest.
> > > >
> > >
> > > Yes, the reassignment script should not create the reassignment znode
> if
> > no
> > > replicas are not be moved between brokers. This falls into the "How to
> > move
> > > replica between log directories on the same broker" of the Proposed
> > Change
> > > section.
> > >
> > >
> > > > 11.2 If ChangeReplicaDirRequest specifies a replica that's not
> created
> > > yet,
> > > > could the broker just remember that in memory and create the replica
> 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-16 Thread Dong Lin
Hey Jun,

After thinking about 14 more, I think your solution is reasonable. I have
updated the KIP to specify that the number of ReplicaMoveThread defaults
to # log dirs.

Thanks!
Dong


On Thu, Mar 16, 2017 at 12:28 AM, Dong Lin  wrote:

> Hey Jun,
>
> Thanks for your comment! Please see my reply below.
>
> On Wed, Mar 15, 2017 at 9:45 PM, Jun Rao  wrote:
>
>> Hi, Dong,
>>
>> Thanks for the reply.
>>
>> 10. Could you comment on that?
>>
>
> Sorry, I missed that comment.
>
> Good point. I think the log segments in topicPartition.move directory will
> be subject to log truncation, log retention and log cleaning in the same
> way as the log segments in the source log directory. I just specified this
> inthe KIP.
>
>
>>
>> 11.2 "I am concerned that the ChangeReplicaDirRequest would be lost if
>> broker
>> restarts after it sends ChangeReplicaDirResponse but before it receives
>> LeaderAndIsrRequest."
>>
>> In that case, the reassignment tool could detect that through
>> DescribeDirsRequest
>> and issue ChangeReplicaDirRequest again, right? In the common case, this
>> is
>> probably not needed and we only need to write each replica once.
>>
>> My main concern with the approach in the current KIP is that once a new
>> replica is created in the wrong log dir, the cross log directory movement
>> may not catch up until the new replica is fully bootstrapped. So, we end
>> up
>> writing the data for the same replica twice.
>>
>
> I agree with your concern. My main concern is that it is a bit weird if
> ChangeReplicaDirResponse can not guarantee success and the tool needs to
> rely on DescribeDirResponse to see if it needs to send
> ChangeReplicaDirRequest again.
>
> How about this: If broker doesn't not have already replica created for the
> specified topicParition when it receives ChangeReplicaDirRequest, it will
> reply ReplicaNotAvailableException AND remember (replica, destination log
> directory) pair in memory to create the replica in the specified log
> directory.
>
>
>>
>> 11.3 Are you saying the value in --throttle will be used to set both
>> intra.broker.throttled.rate and leader.follower.replication.
>> throttled.replicas?
>>
>
> No. --throttle will be used to only to set leader.follower.replication as
> it does now. I think we do not need any option in the
> kafka-reassignment-partitions.sh to specify intra.broker.throttled.rate.
> User canset it in broker config or dynamically using kafka-config.sh. Does
> this sound OK?
>
>
>>
>> 12.2 If the user only wants to check one topic, the tool could do the
>> filtering on the client side, right? My concern with having both log_dirs
>> and topics is the semantic. For example, if both are not empty, do we
>> return the intersection or the union?
>>
>
> Yes the tool could filter on the client side. But the purpose of having
> this field is to reduce response side in case broker has a lot of topics.
> The both fields are used as filter and the result is intersection. Do you
> think this semantic is confusing or counter-intuitive?
>
>
>>
>> 12.3. Yes, firstOffset may not be useful.
>>
>> 14. Hmm, I would think moving data across log dirs will be io bound. We
>> also have num.recovery.threads.per.data.dir, which defaults to 1. So,
>> having num.replica.move.threads defaults to # log dirs or half of that (to
>> account for ios on both source and target) seems reasonable. Is a magical
>> value of 3 going to be better? Does that work with only 2 log dirs? There
>> will always be cases when the user needs to customize the value. We just
>> need a reasonable default to cover the common case.
>>
>
> If the throughput of moving data across dir doesn't not increase with
> number of threads, I think we should provide config
> num.replica.move.thread.per.log.dir and give it default value of 1. That
> works in the same way as num.recovery.threads.per.data.dir. But I think
> the replica movement is not necessarily IO bound if broker is using SSD.
> Thus it seems more reasonable to have config num.replica.move.threads that
> is shared across all log directories.
>
> Currently all Kafka configs, including num.recovery.threads.per.data.dir,
> defaults to a constant value instead of relying on values of configs. This
> it will be a bit weird if the config name itself is not per log dir but its
> default value is per dir. And it will also make both code and user
> documentation a bit more complicated because currently all configs,
> including num.recovery.threads.per.data.dir, defaults to a constant
> value. The advantage of using a magic value is simplicity. To answer your
> question, I think 3 ReplicaMoveThreads can work with more than 2 log
> directories. Say there are 3 ReplicaMoveThreads and 4 log directories, each
> ReplicaMoveThread will check if there is any replica waiting for movement,
> finish movement of this replica, and check again. Is there any concern with
> this approach?
>
> I have chosen the magic value 3 because current 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-16 Thread Dong Lin
Hey Jun,

Thanks for your comment! Please see my reply below.

On Wed, Mar 15, 2017 at 9:45 PM, Jun Rao  wrote:

> Hi, Dong,
>
> Thanks for the reply.
>
> 10. Could you comment on that?
>

Sorry, I missed that comment.

Good point. I think the log segments in topicPartition.move directory will
be subject to log truncation, log retention and log cleaning in the same
way as the log segments in the source log directory. I just specified this
inthe KIP.


>
> 11.2 "I am concerned that the ChangeReplicaDirRequest would be lost if
> broker
> restarts after it sends ChangeReplicaDirResponse but before it receives
> LeaderAndIsrRequest."
>
> In that case, the reassignment tool could detect that through
> DescribeDirsRequest
> and issue ChangeReplicaDirRequest again, right? In the common case, this is
> probably not needed and we only need to write each replica once.
>
> My main concern with the approach in the current KIP is that once a new
> replica is created in the wrong log dir, the cross log directory movement
> may not catch up until the new replica is fully bootstrapped. So, we end up
> writing the data for the same replica twice.
>

I agree with your concern. My main concern is that it is a bit weird if
ChangeReplicaDirResponse can not guarantee success and the tool needs to
rely on DescribeDirResponse to see if it needs to send
ChangeReplicaDirRequest again.

How about this: If broker doesn't not have already replica created for the
specified topicParition when it receives ChangeReplicaDirRequest, it will
reply ReplicaNotAvailableException AND remember (replica, destination log
directory) pair in memory to create the replica in the specified log
directory.


>
> 11.3 Are you saying the value in --throttle will be used to set both
> intra.broker.throttled.rate and leader.follower.replication.
> throttled.replicas?
>

No. --throttle will be used to only to set leader.follower.replication as
it does now. I think we do not need any option in the
kafka-reassignment-partitions.sh to specify intra.broker.throttled.rate.
User canset it in broker config or dynamically using kafka-config.sh. Does
this sound OK?


>
> 12.2 If the user only wants to check one topic, the tool could do the
> filtering on the client side, right? My concern with having both log_dirs
> and topics is the semantic. For example, if both are not empty, do we
> return the intersection or the union?
>

Yes the tool could filter on the client side. But the purpose of having
this field is to reduce response side in case broker has a lot of topics.
The both fields are used as filter and the result is intersection. Do you
think this semantic is confusing or counter-intuitive?


>
> 12.3. Yes, firstOffset may not be useful.
>
> 14. Hmm, I would think moving data across log dirs will be io bound. We
> also have num.recovery.threads.per.data.dir, which defaults to 1. So,
> having num.replica.move.threads defaults to # log dirs or half of that (to
> account for ios on both source and target) seems reasonable. Is a magical
> value of 3 going to be better? Does that work with only 2 log dirs? There
> will always be cases when the user needs to customize the value. We just
> need a reasonable default to cover the common case.
>

If the throughput of moving data across dir doesn't not increase with
number of threads, I think we should provide config
num.replica.move.thread.per.log.dir and give it default value of 1. That
works in the same way as num.recovery.threads.per.data.dir. But I think the
replica movement is not necessarily IO bound if broker is using SSD. Thus
it seems more reasonable to have config num.replica.move.threads that is
shared across all log directories.

Currently all Kafka configs, including num.recovery.threads.per.data.dir,
defaults to a constant value instead of relying on values of configs. This
it will be a bit weird if the config name itself is not per log dir but its
default value is per dir. And it will also make both code and user
documentation a bit more complicated because currently all configs,
including num.recovery.threads.per.data.dir, defaults to a constant value.
The advantage of using a magic value is simplicity. To answer your
question, I think 3 ReplicaMoveThreads can work with more than 2 log
directories. Say there are 3 ReplicaMoveThreads and 4 log directories, each
ReplicaMoveThread will check if there is any replica waiting for movement,
finish movement of this replica, and check again. Is there any concern with
this approach?

I have chosen the magic value 3 because current default number of network
threads is 3. We can also set it to 8 which is the default number of io
threads. Would there be any performance concern with using 8 threads by
default?



>
> 20. Should we support canceling the movement across log dirs? I was
> thinking this can be achieved with a ChangeReplicaDirRequest with dir =
> any.
>

As of current KIP user can cancel movement across log directories by first
sending 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-15 Thread Jun Rao
Hi, Dong,

Thanks for the reply.

10. Could you comment on that?

11.2 "I am concerned that the ChangeReplicaDirRequest would be lost if
broker
restarts after it sends ChangeReplicaDirResponse but before it receives
LeaderAndIsrRequest."

In that case, the reassignment tool could detect that through
DescribeDirsRequest
and issue ChangeReplicaDirRequest again, right? In the common case, this is
probably not needed and we only need to write each replica once.

My main concern with the approach in the current KIP is that once a new
replica is created in the wrong log dir, the cross log directory movement
may not catch up until the new replica is fully bootstrapped. So, we end up
writing the data for the same replica twice.

11.3 Are you saying the value in --throttle will be used to set both
intra.broker.throttled.rate and leader.follower.replication.
throttled.replicas?

12.2 If the user only wants to check one topic, the tool could do the
filtering on the client side, right? My concern with having both log_dirs
and topics is the semantic. For example, if both are not empty, do we
return the intersection or the union?

12.3. Yes, firstOffset may not be useful.

14. Hmm, I would think moving data across log dirs will be io bound. We
also have num.recovery.threads.per.data.dir, which defaults to 1. So,
having num.replica.move.threads defaults to # log dirs or half of that (to
account for ios on both source and target) seems reasonable. Is a magical
value of 3 going to be better? Does that work with only 2 log dirs? There
will always be cases when the user needs to customize the value. We just
need a reasonable default to cover the common case.

20. Should we support canceling the movement across log dirs? I was
thinking this can be achieved with a ChangeReplicaDirRequest with dir = any.

Jun


On Mon, Mar 13, 2017 at 3:32 PM, Dong Lin  wrote:

> Hey Jun,
>
> Thanks much for your detailed comments. Please see my reply below.
>
> On Mon, Mar 13, 2017 at 9:09 AM, Jun Rao  wrote:
>
> > Hi, Dong,
> >
> > Thanks for the updated KIP. Some more comments below.
> >
> > 10. For the .move log, do we perform any segment deletion (based on
> > retention) or log cleaning (if a compacted topic)? Or do we only enable
> > that after the swap?
> >
> > 11. kafka-reassign-partitions.sh
> > 11.1 If all reassigned replicas are in the current broker and only the
> log
> > directories have changed, we can probably optimize the tool to not
> trigger
> > partition reassignment through the controller and only
> > send ChangeReplicaDirRequest.
> >
>
> Yes, the reassignment script should not create the reassignment znode if no
> replicas are not be moved between brokers. This falls into the "How to move
> replica between log directories on the same broker" of the Proposed Change
> section.
>
>
> > 11.2 If ChangeReplicaDirRequest specifies a replica that's not created
> yet,
> > could the broker just remember that in memory and create the replica when
> > the creation is requested? This way, when doing cluster expansion, we can
> > make sure that the new replicas on the new brokers are created in the
> right
> > log directory in the first place. We can also avoid the tool having to
> keep
> > issuing ChangeReplicaDirRequest in response to
> > ReplicaNotAvailableException.
> >
>
> I am concerned that the ChangeReplicaDirRequest would be lost if broker
> restarts after it sends ChangeReplicaDirResponse but before it receives
> LeaderAndIsrRequest. In this case, the user will receive success when they
> initiate replica reassignment, but replica reassignment will never complete
> when they verify the reassignment later. This would be confusing to user.
>
> There are three different approaches to this problem if broker has not
> created replica yet after it receives ChangeReplicaDirResquest:
>
> 1) Broker immediately replies to user with ReplicaNotAvailableException and
> user can decide to retry again later. The advantage of this solution is
> that the broker logic is very simple and the reassignment script logic also
> seems straightforward. The disadvantage is that user script has to retry.
> But it seems fine - we can set interval between retries to be 0.5 sec so
> that broker want be bombarded by those requests. This is the solution
> chosen in the current KIP.
>
> 2) Broker can put ChangeReplicaDirRequest in a purgatory with timeout and
> replies to user after the replica has been created. I didn't choose this in
> the interest of keeping broker logic simpler.
>
> 3) Broker can remember that by making a mark in the disk, e.g. create
> topicPartition.tomove directory in the destination log directory. This mark
> will be persisted across broker restart. This is the first idea I had but I
> replaced it with solution 1) in the interest of keeping broker simple.
>
> It seems that solution 1) is the simplest one that works. But I am OK to
> switch to the other two solutions if we don't want the retry 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-13 Thread Dong Lin
Hey Jun,

Thanks much for your detailed comments. Please see my reply below.

On Mon, Mar 13, 2017 at 9:09 AM, Jun Rao  wrote:

> Hi, Dong,
>
> Thanks for the updated KIP. Some more comments below.
>
> 10. For the .move log, do we perform any segment deletion (based on
> retention) or log cleaning (if a compacted topic)? Or do we only enable
> that after the swap?
>
> 11. kafka-reassign-partitions.sh
> 11.1 If all reassigned replicas are in the current broker and only the log
> directories have changed, we can probably optimize the tool to not trigger
> partition reassignment through the controller and only
> send ChangeReplicaDirRequest.
>

Yes, the reassignment script should not create the reassignment znode if no
replicas are not be moved between brokers. This falls into the "How to move
replica between log directories on the same broker" of the Proposed Change
section.


> 11.2 If ChangeReplicaDirRequest specifies a replica that's not created yet,
> could the broker just remember that in memory and create the replica when
> the creation is requested? This way, when doing cluster expansion, we can
> make sure that the new replicas on the new brokers are created in the right
> log directory in the first place. We can also avoid the tool having to keep
> issuing ChangeReplicaDirRequest in response to
> ReplicaNotAvailableException.
>

I am concerned that the ChangeReplicaDirRequest would be lost if broker
restarts after it sends ChangeReplicaDirResponse but before it receives
LeaderAndIsrRequest. In this case, the user will receive success when they
initiate replica reassignment, but replica reassignment will never complete
when they verify the reassignment later. This would be confusing to user.

There are three different approaches to this problem if broker has not
created replica yet after it receives ChangeReplicaDirResquest:

1) Broker immediately replies to user with ReplicaNotAvailableException and
user can decide to retry again later. The advantage of this solution is
that the broker logic is very simple and the reassignment script logic also
seems straightforward. The disadvantage is that user script has to retry.
But it seems fine - we can set interval between retries to be 0.5 sec so
that broker want be bombarded by those requests. This is the solution
chosen in the current KIP.

2) Broker can put ChangeReplicaDirRequest in a purgatory with timeout and
replies to user after the replica has been created. I didn't choose this in
the interest of keeping broker logic simpler.

3) Broker can remember that by making a mark in the disk, e.g. create
topicPartition.tomove directory in the destination log directory. This mark
will be persisted across broker restart. This is the first idea I had but I
replaced it with solution 1) in the interest of keeping broker simple.

It seems that solution 1) is the simplest one that works. But I am OK to
switch to the other two solutions if we don't want the retry logic. What do
you think?


11.3 Do we need an option in the tool to specify intra.broker.
> throttled.rate?
>

I don't find it useful to add this option to kafka-reassign-partitions.sh.
The reason we have the option "--throttle" in the script to throttle
replication rate is that we usually want higher quota to fix an offline
replica to get out of URP. But we are OK to have a lower quota if we are
moving replica only to balance the cluster. Thus it is common for SRE to
use different quota when using kafka-reassign-partitions.sh to move replica
between brokers.

However, the only reason for moving replica between log directories of the
same broker is to balance cluster resource. Thus the option to
specify intra.broker.throttled.rate in the tool is not that useful. I am
inclined not to add this option to keep this tool's usage simpler.


>
> 12. DescribeDirsRequest
> 12.1 In other requests like CreateTopicRequest, we return an empty list in
> the response for an empty input list. If the input list is null, we return
> everything. We should probably follow the same convention here.
>

Thanks. I wasn't aware of this convention. I have change
DescribeDirsRequest so that "null" indicates "all".


> 12.2 Do we need the topics field? Since the request is about log dirs, it
> makes sense to specify the log dirs. But it's weird to specify topics.
>

The topics field is not necessary. But it is useful to reduce the response
size in case user are only interested in the status of a few topics. For
example, user may have initiated the reassignment of a given replica from
one log directory to another log directory on the same broker, and the user
only wants to check the status of this given partition by looking
at DescribeDirsResponse. Thus this field is useful.

I am not sure if it is weird to call this request DescribeDirsRequest. The
response is a map from log directory to information to some partitions on
the log directory. Do you think we need to change the name of the request?


> 12.3 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-13 Thread Jun Rao
Hi, Dong,

Thanks for the updated KIP. Some more comments below.

10. For the .move log, do we perform any segment deletion (based on
retention) or log cleaning (if a compacted topic)? Or do we only enable
that after the swap?

11. kafka-reassign-partitions.sh
11.1 If all reassigned replicas are in the current broker and only the log
directories have changed, we can probably optimize the tool to not trigger
partition reassignment through the controller and only
send ChangeReplicaDirRequest.
11.2 If ChangeReplicaDirRequest specifies a replica that's not created yet,
could the broker just remember that in memory and create the replica when
the creation is requested? This way, when doing cluster expansion, we can
make sure that the new replicas on the new brokers are created in the right
log directory in the first place. We can also avoid the tool having to keep
issuing ChangeReplicaDirRequest in response to ReplicaNotAvailableException.
11.3 Do we need an option in the tool to specify intra.broker.
throttled.rate?

12. DescribeDirsRequest
12.1 In other requests like CreateTopicRequest, we return an empty list in
the response for an empty input list. If the input list is null, we return
everything. We should probably follow the same convention here.
12.2 Do we need the topics field? Since the request is about log dirs, it
makes sense to specify the log dirs. But it's weird to specify topics.
12.3 DescribeDirsResponsePartition: Should we include firstOffset and
nextOffset in the response? That could be useful to track the progress of
the movement.

13. ChangeReplicaDirResponse: Do we need error code at both levels?

14. num.replica.move.threads: Does it default to # log dirs?

Thanks,

Jun

On Thu, Mar 9, 2017 at 7:04 PM, Dong Lin  wrote:

> I just made one correction in the KIP. If broker receives
> ChangeReplicaDirRequest and the replica hasn't been created there, the
> broker will respond ReplicaNotAvailableException.
> The kafka-reassignemnt-partitions.sh will need to re-send
> ChangeReplicaDirRequest in this case in order to wait for controller to
> send LeaderAndIsrRequest to broker. The previous approach of creating an
> empty directory seems hacky.
>
>
>
>
> On Thu, Mar 9, 2017 at 6:33 PM, Dong Lin  wrote:
>
> > Hey Jun,
> >
> > Thanks for your comments! I have updated the KIP to address your
> comments.
> > Please see my reply inline.
> >
> > Can you let me know if the latest KIP has addressed your comments?
> >
> > On Wed, Mar 8, 2017 at 9:56 PM, Jun Rao  wrote:
> >
> >> Hi, Dong,
> >>
> >> Thanks for the reply.
> >>
> >> 1.3 So the thread gets the lock, checks if caught up and releases the
> lock
> >> if not? Then, in the case when there is continuous incoming data, the
> >> thread may never get a chance to swap. One way to address this is when
> the
> >> thread is getting really close in catching up, just hold onto the lock
> >> until the thread fully catches up.
> >>
> >
> > Yes, that was my original solution. I see your point that the lock may
> not
> > be fairly assigned to ReplicaMoveThread and RequestHandlerThread when
> there
> > is frequent incoming requets. You solution should address the problem
> and I
> > have updated the KIP to use it.
> >
> >
> >>
> >> 2.3 So, you are saying that the partition reassignment tool can first
> send
> >> a ChangeReplicaDirRequest to relevant brokers to establish the log dir
> for
> >> replicas not created yet, then trigger the partition movement across
> >> brokers through the controller? That's actually a good idea. Then, we
> can
> >> just leave LeaderAndIsrRequest as it is.
> >
> >
> > Yes, that is what I plan to do. If broker receives a
> > ChangeReplicaDirRequest while it is not leader or follower of the
> > partition, the broker will create an empty Log instance (i.e. a directory
> > named topicPartition) in the destination log directory so that the
> replica
> > will be placed there when broker receives LeaderAndIsrRequest from the
> > broker. The broker should clean up empty those Log instances on startup
> > just in case a ChangeReplicaDirRequest was mistakenly sent to a broker
> that
> > was not meant to be follower/leader of the partition..
> >
> >
> >> Another thing related to
> >> ChangeReplicaDirRequest.
> >> Since this request may take long to complete, I am not sure if we should
> >> wait for the movement to complete before respond. While waiting for the
> >> movement to complete, the idle connection may be killed or the client
> may
> >> be gone already. An alternative is to return immediately and add a new
> >> request like CheckReplicaDirRequest to see if the movement has
> completed.
> >> The tool can take advantage of that to check the status.
> >>
> >
> > I agree with your concern and solution. We need request to query the
> > partition -> log_directory mapping on the broker. I have updated the KIP
> to
> > remove need for ChangeReplicaDirRequestPurgatory.
> > Instead, 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-09 Thread Dong Lin
I just made one correction in the KIP. If broker receives
ChangeReplicaDirRequest and the replica hasn't been created there, the
broker will respond ReplicaNotAvailableException.
The kafka-reassignemnt-partitions.sh will need to re-send
ChangeReplicaDirRequest in this case in order to wait for controller to
send LeaderAndIsrRequest to broker. The previous approach of creating an
empty directory seems hacky.




On Thu, Mar 9, 2017 at 6:33 PM, Dong Lin  wrote:

> Hey Jun,
>
> Thanks for your comments! I have updated the KIP to address your comments.
> Please see my reply inline.
>
> Can you let me know if the latest KIP has addressed your comments?
>
> On Wed, Mar 8, 2017 at 9:56 PM, Jun Rao  wrote:
>
>> Hi, Dong,
>>
>> Thanks for the reply.
>>
>> 1.3 So the thread gets the lock, checks if caught up and releases the lock
>> if not? Then, in the case when there is continuous incoming data, the
>> thread may never get a chance to swap. One way to address this is when the
>> thread is getting really close in catching up, just hold onto the lock
>> until the thread fully catches up.
>>
>
> Yes, that was my original solution. I see your point that the lock may not
> be fairly assigned to ReplicaMoveThread and RequestHandlerThread when there
> is frequent incoming requets. You solution should address the problem and I
> have updated the KIP to use it.
>
>
>>
>> 2.3 So, you are saying that the partition reassignment tool can first send
>> a ChangeReplicaDirRequest to relevant brokers to establish the log dir for
>> replicas not created yet, then trigger the partition movement across
>> brokers through the controller? That's actually a good idea. Then, we can
>> just leave LeaderAndIsrRequest as it is.
>
>
> Yes, that is what I plan to do. If broker receives a
> ChangeReplicaDirRequest while it is not leader or follower of the
> partition, the broker will create an empty Log instance (i.e. a directory
> named topicPartition) in the destination log directory so that the replica
> will be placed there when broker receives LeaderAndIsrRequest from the
> broker. The broker should clean up empty those Log instances on startup
> just in case a ChangeReplicaDirRequest was mistakenly sent to a broker that
> was not meant to be follower/leader of the partition..
>
>
>> Another thing related to
>> ChangeReplicaDirRequest.
>> Since this request may take long to complete, I am not sure if we should
>> wait for the movement to complete before respond. While waiting for the
>> movement to complete, the idle connection may be killed or the client may
>> be gone already. An alternative is to return immediately and add a new
>> request like CheckReplicaDirRequest to see if the movement has completed.
>> The tool can take advantage of that to check the status.
>>
>
> I agree with your concern and solution. We need request to query the
> partition -> log_directory mapping on the broker. I have updated the KIP to
> remove need for ChangeReplicaDirRequestPurgatory.
> Instead, kafka-reassignemnt-partitions.sh will send DescribeDirsRequest
> to brokers when user wants to verify the partition assignment. Since we
> need this DescribeDirsRequest anyway, we can also use this request to
> expose stats like the individual log size instead of using JMX. One
> drawback of using JMX is that user has to manage the JMX port and related
> credentials if they haven't already done this, which is the case at
> LinkedIn.
>
>
>> Thanks,
>>
>> Jun
>>
>>
>>
>> On Wed, Mar 8, 2017 at 6:21 PM, Dong Lin  wrote:
>>
>> > Hey Jun,
>> >
>> > Thanks for the detailed explanation. I will use the separate thread
>> pool to
>> > move replica between log directories. I will let you know when the KIP
>> has
>> > been updated to use a separate thread pool.
>> >
>> > Here is my response to your other questions:
>> >
>> > 1.3 My idea is that the ReplicaMoveThread that moves data should get the
>> > lock before checking whether the replica in the destination log
>> directory
>> > has caught up. If the new replica has caught up, then the
>> ReplicaMoveThread
>> > should swaps the replica while it is still holding the lock. The
>> > ReplicaFetcherThread or RequestHandlerThread will not be able to append
>> > data to the replica in the source replica during this period because
>> they
>> > can not get the lock. Does this address the problem?
>> >
>> > 2.3 I get your point that we want to keep controller simpler. If admin
>> tool
>> > can send ChangeReplicaDirRequest to move data within a broker, then
>> > controller probably doesn't even need to include log directory path in
>> the
>> > LeaderAndIsrRequest. How about this: controller will only deal with
>> > reassignment across brokers as it does now. If user specified
>> destination
>> > replica for any disk, the admin tool will send ChangeReplicaDirRequest
>> and
>> > wait for response from broker to confirm that all replicas have been
>> moved
>> > to the 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-09 Thread Dong Lin
Hey Jun,

Thanks for your comments! I have updated the KIP to address your comments.
Please see my reply inline.

Can you let me know if the latest KIP has addressed your comments?

On Wed, Mar 8, 2017 at 9:56 PM, Jun Rao  wrote:

> Hi, Dong,
>
> Thanks for the reply.
>
> 1.3 So the thread gets the lock, checks if caught up and releases the lock
> if not? Then, in the case when there is continuous incoming data, the
> thread may never get a chance to swap. One way to address this is when the
> thread is getting really close in catching up, just hold onto the lock
> until the thread fully catches up.
>

Yes, that was my original solution. I see your point that the lock may not
be fairly assigned to ReplicaMoveThread and RequestHandlerThread when there
is frequent incoming requets. You solution should address the problem and I
have updated the KIP to use it.


>
> 2.3 So, you are saying that the partition reassignment tool can first send
> a ChangeReplicaDirRequest to relevant brokers to establish the log dir for
> replicas not created yet, then trigger the partition movement across
> brokers through the controller? That's actually a good idea. Then, we can
> just leave LeaderAndIsrRequest as it is.


Yes, that is what I plan to do. If broker receives a
ChangeReplicaDirRequest while it is not leader or follower of the
partition, the broker will create an empty Log instance (i.e. a directory
named topicPartition) in the destination log directory so that the replica
will be placed there when broker receives LeaderAndIsrRequest from the
broker. The broker should clean up empty those Log instances on startup
just in case a ChangeReplicaDirRequest was mistakenly sent to a broker that
was not meant to be follower/leader of the partition..


> Another thing related to
> ChangeReplicaDirRequest.
> Since this request may take long to complete, I am not sure if we should
> wait for the movement to complete before respond. While waiting for the
> movement to complete, the idle connection may be killed or the client may
> be gone already. An alternative is to return immediately and add a new
> request like CheckReplicaDirRequest to see if the movement has completed.
> The tool can take advantage of that to check the status.
>

I agree with your concern and solution. We need request to query the
partition -> log_directory mapping on the broker. I have updated the KIP to
remove need for ChangeReplicaDirRequestPurgatory.
Instead, kafka-reassignemnt-partitions.sh will send DescribeDirsRequest to
brokers when user wants to verify the partition assignment. Since we need
this DescribeDirsRequest anyway, we can also use this request to expose
stats like the individual log size instead of using JMX. One drawback of
using JMX is that user has to manage the JMX port and related credentials
if they haven't already done this, which is the case at LinkedIn.


> Thanks,
>
> Jun
>
>
>
> On Wed, Mar 8, 2017 at 6:21 PM, Dong Lin  wrote:
>
> > Hey Jun,
> >
> > Thanks for the detailed explanation. I will use the separate thread pool
> to
> > move replica between log directories. I will let you know when the KIP
> has
> > been updated to use a separate thread pool.
> >
> > Here is my response to your other questions:
> >
> > 1.3 My idea is that the ReplicaMoveThread that moves data should get the
> > lock before checking whether the replica in the destination log directory
> > has caught up. If the new replica has caught up, then the
> ReplicaMoveThread
> > should swaps the replica while it is still holding the lock. The
> > ReplicaFetcherThread or RequestHandlerThread will not be able to append
> > data to the replica in the source replica during this period because they
> > can not get the lock. Does this address the problem?
> >
> > 2.3 I get your point that we want to keep controller simpler. If admin
> tool
> > can send ChangeReplicaDirRequest to move data within a broker, then
> > controller probably doesn't even need to include log directory path in
> the
> > LeaderAndIsrRequest. How about this: controller will only deal with
> > reassignment across brokers as it does now. If user specified destination
> > replica for any disk, the admin tool will send ChangeReplicaDirRequest
> and
> > wait for response from broker to confirm that all replicas have been
> moved
> > to the destination log direcotry. The broker will put
> > ChangeReplicaDirRequset in a purgatory and respond either when the
> movement
> > is completed or when the request has timed-out.
> >
> > 4. I agree that we can expose these metrics via JMX. But I am not sure if
> > it can be obtained easily with good performance using either existing
> tools
> > or new script in kafka. I will ask SREs for their opinion.
> >
> > Thanks,
> > Dong
> >
> >
> >
> >
> >
> >
> >
> >
> > On Wed, Mar 8, 2017 at 1:24 PM, Jun Rao  wrote:
> >
> > > Hi, Dong,
> > >
> > > Thanks for the updated KIP. A few more comments below.
> > >
> > 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-08 Thread Jun Rao
Hi, Dong,

Thanks for the reply.

1.3 So the thread gets the lock, checks if caught up and releases the lock
if not? Then, in the case when there is continuous incoming data, the
thread may never get a chance to swap. One way to address this is when the
thread is getting really close in catching up, just hold onto the lock
until the thread fully catches up.

2.3 So, you are saying that the partition reassignment tool can first send
a ChangeReplicaDirRequest to relevant brokers to establish the log dir for
replicas not created yet, then trigger the partition movement across
brokers through the controller? That's actually a good idea. Then, we can
just leave LeaderAndIsrRequest as it is. Another thing related to
ChangeReplicaDirRequest.
Since this request may take long to complete, I am not sure if we should
wait for the movement to complete before respond. While waiting for the
movement to complete, the idle connection may be killed or the client may
be gone already. An alternative is to return immediately and add a new
request like CheckReplicaDirRequest to see if the movement has completed.
The tool can take advantage of that to check the status.

Thanks,

Jun



On Wed, Mar 8, 2017 at 6:21 PM, Dong Lin  wrote:

> Hey Jun,
>
> Thanks for the detailed explanation. I will use the separate thread pool to
> move replica between log directories. I will let you know when the KIP has
> been updated to use a separate thread pool.
>
> Here is my response to your other questions:
>
> 1.3 My idea is that the ReplicaMoveThread that moves data should get the
> lock before checking whether the replica in the destination log directory
> has caught up. If the new replica has caught up, then the ReplicaMoveThread
> should swaps the replica while it is still holding the lock. The
> ReplicaFetcherThread or RequestHandlerThread will not be able to append
> data to the replica in the source replica during this period because they
> can not get the lock. Does this address the problem?
>
> 2.3 I get your point that we want to keep controller simpler. If admin tool
> can send ChangeReplicaDirRequest to move data within a broker, then
> controller probably doesn't even need to include log directory path in the
> LeaderAndIsrRequest. How about this: controller will only deal with
> reassignment across brokers as it does now. If user specified destination
> replica for any disk, the admin tool will send ChangeReplicaDirRequest and
> wait for response from broker to confirm that all replicas have been moved
> to the destination log direcotry. The broker will put
> ChangeReplicaDirRequset in a purgatory and respond either when the movement
> is completed or when the request has timed-out.
>
> 4. I agree that we can expose these metrics via JMX. But I am not sure if
> it can be obtained easily with good performance using either existing tools
> or new script in kafka. I will ask SREs for their opinion.
>
> Thanks,
> Dong
>
>
>
>
>
>
>
>
> On Wed, Mar 8, 2017 at 1:24 PM, Jun Rao  wrote:
>
> > Hi, Dong,
> >
> > Thanks for the updated KIP. A few more comments below.
> >
> > 1.1 and 1.2: I am still not sure there is enough benefit of reusing
> > ReplicaFetchThread
> > to move data across disks.
> > (a) A big part of ReplicaFetchThread is to deal with issuing and tracking
> > fetch requests. So, it doesn't feel that we get much from reusing
> > ReplicaFetchThread
> > only to disable the fetching part.
> > (b) The leader replica has no ReplicaFetchThread to start with. It feels
> > weird to start one just for intra broker data movement.
> > (c) The ReplicaFetchThread is per broker. Intuitively, the number of
> > threads doing intra broker data movement should be related to the number
> of
> > disks in the broker, not the number of brokers in the cluster.
> > (d) If the destination disk fails, we want to stop the intra broker data
> > movement, but want to continue inter broker replication. So, logically,
> it
> > seems it's better to separate out the two.
> > (e) I am also not sure if we should reuse the existing throttling for
> > replication. It's designed to handle traffic across brokers and the
> > delaying is done in the fetch request. So, if we are not doing
> > fetching in ReplicaFetchThread,
> > I am not sure the existing throttling is effective. Also, when specifying
> > the throttling of moving data across disks, it seems the user shouldn't
> > care about whether a replica is a leader or a follower. Reusing the
> > existing throttling config name will be awkward in this regard.
> > (f) It seems it's simpler and more consistent to use a separate thread
> pool
> > for local data movement (for both leader and follower replicas). This
> > process can then be configured (e.g. number of threads, etc) and
> throttled
> > independently.
> >
> > 1.3 Yes, we will need some synchronization there. So, if the movement
> > thread catches up, gets the lock to do the swap, but realizes that new
> data
> > 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-08 Thread Dong Lin
Hey Jun,

Thanks for the detailed explanation. I will use the separate thread pool to
move replica between log directories. I will let you know when the KIP has
been updated to use a separate thread pool.

Here is my response to your other questions:

1.3 My idea is that the ReplicaMoveThread that moves data should get the
lock before checking whether the replica in the destination log directory
has caught up. If the new replica has caught up, then the ReplicaMoveThread
should swaps the replica while it is still holding the lock. The
ReplicaFetcherThread or RequestHandlerThread will not be able to append
data to the replica in the source replica during this period because they
can not get the lock. Does this address the problem?

2.3 I get your point that we want to keep controller simpler. If admin tool
can send ChangeReplicaDirRequest to move data within a broker, then
controller probably doesn't even need to include log directory path in the
LeaderAndIsrRequest. How about this: controller will only deal with
reassignment across brokers as it does now. If user specified destination
replica for any disk, the admin tool will send ChangeReplicaDirRequest and
wait for response from broker to confirm that all replicas have been moved
to the destination log direcotry. The broker will put
ChangeReplicaDirRequset in a purgatory and respond either when the movement
is completed or when the request has timed-out.

4. I agree that we can expose these metrics via JMX. But I am not sure if
it can be obtained easily with good performance using either existing tools
or new script in kafka. I will ask SREs for their opinion.

Thanks,
Dong








On Wed, Mar 8, 2017 at 1:24 PM, Jun Rao  wrote:

> Hi, Dong,
>
> Thanks for the updated KIP. A few more comments below.
>
> 1.1 and 1.2: I am still not sure there is enough benefit of reusing
> ReplicaFetchThread
> to move data across disks.
> (a) A big part of ReplicaFetchThread is to deal with issuing and tracking
> fetch requests. So, it doesn't feel that we get much from reusing
> ReplicaFetchThread
> only to disable the fetching part.
> (b) The leader replica has no ReplicaFetchThread to start with. It feels
> weird to start one just for intra broker data movement.
> (c) The ReplicaFetchThread is per broker. Intuitively, the number of
> threads doing intra broker data movement should be related to the number of
> disks in the broker, not the number of brokers in the cluster.
> (d) If the destination disk fails, we want to stop the intra broker data
> movement, but want to continue inter broker replication. So, logically, it
> seems it's better to separate out the two.
> (e) I am also not sure if we should reuse the existing throttling for
> replication. It's designed to handle traffic across brokers and the
> delaying is done in the fetch request. So, if we are not doing
> fetching in ReplicaFetchThread,
> I am not sure the existing throttling is effective. Also, when specifying
> the throttling of moving data across disks, it seems the user shouldn't
> care about whether a replica is a leader or a follower. Reusing the
> existing throttling config name will be awkward in this regard.
> (f) It seems it's simpler and more consistent to use a separate thread pool
> for local data movement (for both leader and follower replicas). This
> process can then be configured (e.g. number of threads, etc) and throttled
> independently.
>
> 1.3 Yes, we will need some synchronization there. So, if the movement
> thread catches up, gets the lock to do the swap, but realizes that new data
> is added, it has to continue catching up while holding the lock?
>
> 2.3 The benefit of including the desired log directory in
> LeaderAndIsrRequest
> during partition reassignment is that the controller doesn't need to track
> the progress for disk movement. So, you don't need the additional
> BrokerDirStateUpdateRequest. Then the controller never needs to issue
> ChangeReplicaDirRequest.
> Only the admin tool will issue ChangeReplicaDirRequest to move data within
> a broker. I agree that this makes LeaderAndIsrRequest more complicated, but
> that seems simpler than changing the controller to track additional states
> during partition reassignment.
>
> 4. We want to make a decision on how to expose the stats. So far, we are
> exposing stats like the individual log size as JMX. So, one way is to just
> add new jmx to expose the log directory of individual replicas.
>
> Thanks,
>
> Jun
>
>
> On Thu, Mar 2, 2017 at 11:18 PM, Dong Lin  wrote:
>
> > Hey Jun,
> >
> > Thanks for all the comments! Please see my answer below. I have updated
> the
> > KIP to address most of the questions and make the KIP easier to
> understand.
> >
> > Thanks,
> > Dong
> >
> > On Thu, Mar 2, 2017 at 9:35 AM, Jun Rao  wrote:
> >
> > > Hi, Dong,
> > >
> > > Thanks for the KIP. A few comments below.
> > >
> > > 1. For moving data across directories
> > > 1.1 I am not sure 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-08 Thread Jun Rao
Hi, Dong,

Thanks for the updated KIP. A few more comments below.

1.1 and 1.2: I am still not sure there is enough benefit of reusing
ReplicaFetchThread
to move data across disks.
(a) A big part of ReplicaFetchThread is to deal with issuing and tracking
fetch requests. So, it doesn't feel that we get much from reusing
ReplicaFetchThread
only to disable the fetching part.
(b) The leader replica has no ReplicaFetchThread to start with. It feels
weird to start one just for intra broker data movement.
(c) The ReplicaFetchThread is per broker. Intuitively, the number of
threads doing intra broker data movement should be related to the number of
disks in the broker, not the number of brokers in the cluster.
(d) If the destination disk fails, we want to stop the intra broker data
movement, but want to continue inter broker replication. So, logically, it
seems it's better to separate out the two.
(e) I am also not sure if we should reuse the existing throttling for
replication. It's designed to handle traffic across brokers and the
delaying is done in the fetch request. So, if we are not doing
fetching in ReplicaFetchThread,
I am not sure the existing throttling is effective. Also, when specifying
the throttling of moving data across disks, it seems the user shouldn't
care about whether a replica is a leader or a follower. Reusing the
existing throttling config name will be awkward in this regard.
(f) It seems it's simpler and more consistent to use a separate thread pool
for local data movement (for both leader and follower replicas). This
process can then be configured (e.g. number of threads, etc) and throttled
independently.

1.3 Yes, we will need some synchronization there. So, if the movement
thread catches up, gets the lock to do the swap, but realizes that new data
is added, it has to continue catching up while holding the lock?

2.3 The benefit of including the desired log directory in LeaderAndIsrRequest
during partition reassignment is that the controller doesn't need to track
the progress for disk movement. So, you don't need the additional
BrokerDirStateUpdateRequest. Then the controller never needs to issue
ChangeReplicaDirRequest.
Only the admin tool will issue ChangeReplicaDirRequest to move data within
a broker. I agree that this makes LeaderAndIsrRequest more complicated, but
that seems simpler than changing the controller to track additional states
during partition reassignment.

4. We want to make a decision on how to expose the stats. So far, we are
exposing stats like the individual log size as JMX. So, one way is to just
add new jmx to expose the log directory of individual replicas.

Thanks,

Jun


On Thu, Mar 2, 2017 at 11:18 PM, Dong Lin  wrote:

> Hey Jun,
>
> Thanks for all the comments! Please see my answer below. I have updated the
> KIP to address most of the questions and make the KIP easier to understand.
>
> Thanks,
> Dong
>
> On Thu, Mar 2, 2017 at 9:35 AM, Jun Rao  wrote:
>
> > Hi, Dong,
> >
> > Thanks for the KIP. A few comments below.
> >
> > 1. For moving data across directories
> > 1.1 I am not sure why we want to use ReplicaFetcherThread to move data
> > around in the leader. ReplicaFetchThread fetches data from socket. For
> > moving data locally, it seems that we want to avoid the socket overhead.
> >
>
> The purpose of using ReplicaFetchThread is to re-use existing thread
> instead of creating more threads and make our thread model more complex. It
> seems like a nature choice for copying data between disks since it is
> similar to copying data between brokers. Another reason is that if the
> replica to be moved is a follower, we don't need lock to swap replicas when
> destination replica has caught up, since the same thread which is fetching
> data from leader will swap the replica.
>
> The ReplicaFetchThread will not incur socket overhead while copying data
> between disks. It will read directly from source disk (as we do when
> processing FetchRequest) and write to destination disk (as we do when
> processing ProduceRequest).
>
>
> > 1.2 I am also not sure about moving data in the ReplicaFetcherThread in
> the
> > follower. For example, I am not sure setting replica.fetch.max.wait to 0
> >  is ideal. It may not always be effective since a fetch request in the
> > ReplicaFetcherThread could be arbitrarily delayed due to replication
> > throttling on the leader. In general, the data movement logic across
> disks
> > seems different from that in ReplicaFetcherThread. So, I am not sure why
> > they need to be coupled.
> >
>
> While it may not be the most efficient way to copy data between local
> disks, it will be at least as efficient as copying data from leader to the
> destination disk. The expected goal of KIP-113 is to enable data movement
> between disks with no less efficiency than what we do now when moving data
> between brokers. I think we can optimize its performance using separate
> thread if the performance is not 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-02 Thread Dong Lin
Hey Jun,

Thanks for all the comments! Please see my answer below. I have updated the
KIP to address most of the questions and make the KIP easier to understand.

Thanks,
Dong

On Thu, Mar 2, 2017 at 9:35 AM, Jun Rao  wrote:

> Hi, Dong,
>
> Thanks for the KIP. A few comments below.
>
> 1. For moving data across directories
> 1.1 I am not sure why we want to use ReplicaFetcherThread to move data
> around in the leader. ReplicaFetchThread fetches data from socket. For
> moving data locally, it seems that we want to avoid the socket overhead.
>

The purpose of using ReplicaFetchThread is to re-use existing thread
instead of creating more threads and make our thread model more complex. It
seems like a nature choice for copying data between disks since it is
similar to copying data between brokers. Another reason is that if the
replica to be moved is a follower, we don't need lock to swap replicas when
destination replica has caught up, since the same thread which is fetching
data from leader will swap the replica.

The ReplicaFetchThread will not incur socket overhead while copying data
between disks. It will read directly from source disk (as we do when
processing FetchRequest) and write to destination disk (as we do when
processing ProduceRequest).


> 1.2 I am also not sure about moving data in the ReplicaFetcherThread in the
> follower. For example, I am not sure setting replica.fetch.max.wait to 0
>  is ideal. It may not always be effective since a fetch request in the
> ReplicaFetcherThread could be arbitrarily delayed due to replication
> throttling on the leader. In general, the data movement logic across disks
> seems different from that in ReplicaFetcherThread. So, I am not sure why
> they need to be coupled.
>

While it may not be the most efficient way to copy data between local
disks, it will be at least as efficient as copying data from leader to the
destination disk. The expected goal of KIP-113 is to enable data movement
between disks with no less efficiency than what we do now when moving data
between brokers. I think we can optimize its performance using separate
thread if the performance is not good enough.


> 1.3 Could you add a bit more details on how we swap the replicas when the
> new ones are fully caught up? For example, what happens when the new
> replica in the new log directory is caught up, but when we want to do the
> swap, some new data has arrived?
>

If the replica is a leader, then ReplicaFetcherThread will perform the
replacement. Proper lock is needed to prevent KafkaRequestHandler from
appending data to the topicPartition.log on the source disks before this
replacement is completed by ReplicaFetcherThread.

If the replica is a follower, because the same ReplicaFetchThread which
fetches data from leader will also swap the replica , no lock is needed.

I have updated the KIP to specify both more explicitly.



> 1.4 Do we need to do the .move at the log segment level or could we just do
> that at the replica directory level? Renaming just a directory is much
> faster than renaming the log segments.
>

Great point. I have updated the KIP to rename the log directory instead.


> 1.5 Could you also describe a bit what happens when either the source or
> the target log directory fails while the data moving is in progress?
>

If source log directory fails, then the replica movement will stop and the
source replica is marked offline. If destination log directory fails, then
the replica movement will stop. I have updated the KIP to clarify this.


>
> 2. For partition reassignment.
> 2.1 I am not sure if the controller can block on ChangeReplicaDirRequest.
> Data movement may take a long time to complete. If there is an outstanding
> request from the controller to a broker, that broker won't be able to
> process any new request from the controller. So if another event (e.g.
> broker failure) happens when the data movement is in progress, subsequent
> LeaderAnIsrRequest will be delayed.
>

Yeah good point. I missed the fact that there is be only one inflight
request from controller to broker.

How about I add a request, e.g. BrokerDirStateUpdateRequest, which maps
topicPartition to log directory and can be sent from broker to controller
to indicate completion?



> 2.2 in the KIP, the partition reassignment tool is also used for cases
> where an admin just wants to balance the existing data across log
> directories in the broker. In this case, it seems that it's over killing to
> have the process go through the controller. A simpler approach is to issue
> an RPC request to the broker directly.
>

I agree we can optimize this case. It is just that we have to add new logic
or code path to handle a scenario that is already covered by the more
complicated scenario. I will add it to the KIP.


> 2.3 When using the partition reassignment tool to move replicas across
> brokers, it make sense to be able to specify the log directory of the newly
> created replicas. The 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-02 Thread Jun Rao
Hi, Dong,

Thanks for the KIP. A few comments below.

1. For moving data across directories
1.1 I am not sure why we want to use ReplicaFetcherThread to move data
around in the leader. ReplicaFetchThread fetches data from socket. For
moving data locally, it seems that we want to avoid the socket overhead.
1.2 I am also not sure about moving data in the ReplicaFetcherThread in the
follower. For example, I am not sure setting replica.fetch.max.wait to 0
 is ideal. It may not always be effective since a fetch request in the
ReplicaFetcherThread could be arbitrarily delayed due to replication
throttling on the leader. In general, the data movement logic across disks
seems different from that in ReplicaFetcherThread. So, I am not sure why
they need to be coupled.
1.3 Could you add a bit more details on how we swap the replicas when the
new ones are fully caught up? For example, what happens when the new
replica in the new log directory is caught up, but when we want to do the
swap, some new data has arrived?
1.4 Do we need to do the .move at the log segment level or could we just do
that at the replica directory level? Renaming just a directory is much
faster than renaming the log segments.
1.5 Could you also describe a bit what happens when either the source or
the target log directory fails while the data moving is in progress?

2. For partition reassignment.
2.1 I am not sure if the controller can block on ChangeReplicaDirRequest.
Data movement may take a long time to complete. If there is an outstanding
request from the controller to a broker, that broker won't be able to
process any new request from the controller. So if another event (e.g.
broker failure) happens when the data movement is in progress, subsequent
LeaderAnIsrRequest will be delayed.
2.2 in the KIP, the partition reassignment tool is also used for cases
where an admin just wants to balance the existing data across log
directories in the broker. In this case, it seems that it's over killing to
have the process go through the controller. A simpler approach is to issue
an RPC request to the broker directly.
2.3 When using the partition reassignment tool to move replicas across
brokers, it make sense to be able to specify the log directory of the newly
created replicas. The KIP does that in two separate requests
ChangeReplicaDirRequest and LeaderAndIsrRequest, and tracks the progress of
each independently. An alternative is to do that just in LeaderAndIsrRequest.
That way, the new replicas will be created in the right log dir in the
first place and the controller just needs to track the progress of
partition reassignment in the current way.

3. /admin/reassign_partitions: Including the log dir in every replica may
not be efficient. We could include a list of log directories and reference
the index of the log directory in each replica.

4. DescribeDirsRequest: The stats in the request are already available from
JMX. Do we need the new request?

5. We want to be consistent on ChangeReplicaDirRequest vs
ChangeReplicaRequest.

Thanks,

Jun


On Fri, Feb 3, 2017 at 6:19 PM, Dong Lin  wrote:

> Hey ALexey,
>
> Thanks for all the comments!
>
> I have updated the KIP to specify how we enforce quota. I also updated the
> "The thread model and broker logic for moving replica data between log
> directories" to make it easier to read. You can find the exact change here
>  n.action?pageId=67638408=5=6>.
> The idea is to use the same replication quota mechanism introduced in
> KIP-73.
>
> Thanks,
> Dong
>
>
>
> On Wed, Feb 1, 2017 at 2:16 AM, Alexey Ozeritsky 
> wrote:
>
> >
> >
> > 24.01.2017, 22:03, "Dong Lin" :
> > > Hey Alexey,
> > >
> > > Thanks. I think we agreed that the suggested solution doesn't work in
> > > general for kafka users. To answer your questions:
> > >
> > > 1. I agree we need quota to rate limit replica movement when a broker
> is
> > > moving a "leader" replica. I will come up with solution, probably
> re-use
> > > the config of replication quota introduced in KIP-73.
> > >
> > > 2. Good point. I agree that this is a problem in general. If is no new
> > data
> > > on that broker, with current default value of
> replica.fetch.wait.max.ms
> > > and replica.fetch.max.bytes, the replica will be moved at only 2 MBps
> > > throughput. I think the solution is for broker to set
> > > replica.fetch.wait.max.ms to 0 in its FetchRequest if the
> corresponding
> > > ReplicaFetcherThread needs to move some replica to another disk.
> > >
> > > 3. I have updated the KIP to mention that the read size of a given
> > > partition is configured using replica.fetch.max.bytes when we move
> > replicas
> > > between disks.
> > >
> > > Please see this
> > >  > pageId=67638408=4=5>
> > > for the change of the KIP. I will come up with a solution to throttle
> > > 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-02-01 Thread Alexey Ozeritsky


24.01.2017, 22:03, "Dong Lin" :
> Hey Alexey,
>
> Thanks. I think we agreed that the suggested solution doesn't work in
> general for kafka users. To answer your questions:
>
> 1. I agree we need quota to rate limit replica movement when a broker is
> moving a "leader" replica. I will come up with solution, probably re-use
> the config of replication quota introduced in KIP-73.
>
> 2. Good point. I agree that this is a problem in general. If is no new data
> on that broker, with current default value of replica.fetch.wait.max.ms
> and replica.fetch.max.bytes, the replica will be moved at only 2 MBps
> throughput. I think the solution is for broker to set
> replica.fetch.wait.max.ms to 0 in its FetchRequest if the corresponding
> ReplicaFetcherThread needs to move some replica to another disk.
>
> 3. I have updated the KIP to mention that the read size of a given
> partition is configured using replica.fetch.max.bytes when we move replicas
> between disks.
>
> Please see this
> 
> for the change of the KIP. I will come up with a solution to throttle
> replica movement when a broker is moving a "leader" replica.

Thanks. It looks great. 

>
> On Tue, Jan 24, 2017 at 3:30 AM, Alexey Ozeritsky 
> wrote:
>
>>  23.01.2017, 22:11, "Dong Lin" :
>>  > Thanks. Please see my comment inline.
>>  >
>>  > On Mon, Jan 23, 2017 at 6:45 AM, Alexey Ozeritsky 
>>  > wrote:
>>  >
>>  >> 13.01.2017, 22:29, "Dong Lin" :
>>  >> > Hey Alexey,
>>  >> >
>>  >> > Thanks for your review and the alternative approach. Here is my
>>  >> > understanding of your patch. kafka's background threads are used to
>>  move
>>  >> > data between replicas. When data movement is triggered, the log will
>>  be
>>  >> > rolled and the new logs will be put in the new directory, and
>>  background
>>  >> > threads will move segment from old directory to new directory.
>>  >> >
>>  >> > It is important to note that KIP-112 is intended to work with
>>  KIP-113 to
>>  >> > support JBOD. I think your solution is definitely simpler and better
>>  >> under
>>  >> > the current kafka implementation that a broker will fail if any disk
>>  >> fails.
>>  >> > But I am not sure if we want to allow broker to run with partial
>>  disks
>>  >> > failure. Let's say the a replica is being moved from log_dir_old to
>>  >> > log_dir_new and then log_dir_old stops working due to disk failure.
>>  How
>>  >> > would your existing patch handles it? To make the scenario a bit more
>>  >>
>>  >> We will lose log_dir_old. After broker restart we can read the data
>>  from
>>  >> log_dir_new.
>>  >
>>  > No, you probably can't. This is because the broker doesn't have *all* the
>>  > data for this partition. For example, say the broker has
>>  > partition_segement_1, partition_segment_50 and partition_segment_100 on
>>  the
>>  > log_dir_old. partition_segment_100, which has the latest data, has been
>>  > moved to log_dir_new, and the log_dir_old fails before
>>  partition_segment_50
>>  > and partition_segment_1 is moved to log_dir_new. When broker re-starts,
>>  it
>>  > won't have partition_segment_50. This causes problem if broker is elected
>>  > leader and consumer wants to consume data in the partition_segment_1.
>>
>>  Right.
>>
>>  >
>>  >> > complicated, let's say the broker is shtudown, log_dir_old's disk
>>  fails,
>>  >> > and the broker starts. In this case broker doesn't even know if
>>  >> log_dir_new
>>  >> > has all the data needed for this replica. It becomes a problem if the
>>  >> > broker is elected leader of this partition in this case.
>>  >>
>>  >> log_dir_new contains the most recent data so we will lose the tail of
>>  >> partition.
>>  >> This is not a big problem for us because we already delete tails by
>>  hand
>>  >> (see https://issues.apache.org/jira/browse/KAFKA-1712).
>>  >> Also we dont use authomatic leader balancing
>>  (auto.leader.rebalance.enable=false),
>>  >> so this partition becomes the leader with a low probability.
>>  >> I think my patch can be modified to prohibit the selection of the
>>  leader
>>  >> until the partition does not move completely.
>>  >
>>  > I guess you are saying that you have deleted the tails by hand in your
>>  own
>>  > kafka branch. But KAFKA-1712 is not accepted into Kafka trunk and I am
>>  not
>>
>>  No. We just modify segments mtime by cron job. This works with vanilla
>>  kafka.
>>
>>  > sure if it is the right solution. How would this solution address the
>>  > problem mentioned above?
>>
>>  If you need only fresh data and if you remove old data by hands this is
>>  not a problem. But in general case
>>  this is a problem of course.
>>
>>  >
>>  > BTW, I am not sure the solution mentioned in KAFKA-1712 is the right way
>>  to
>>  > address its problem. Now that we have timestamp in the message we can 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-01-24 Thread Dong Lin
Hey Alexey,

Thanks. I think we agreed that the suggested solution doesn't work in
general for kafka users. To answer your questions:

1. I agree we need quota to rate limit replica movement when a broker is
moving a "leader" replica. I will come up with solution, probably re-use
the config of replication quota introduced in KIP-73.

2. Good point. I agree that this is a problem in general. If is no new data
on that broker, with current default value of replica.fetch.wait.max.ms
and replica.fetch.max.bytes, the replica will be moved at only 2 MBps
throughput. I think the solution is for broker to set
replica.fetch.wait.max.ms to 0 in its FetchRequest if the corresponding
ReplicaFetcherThread needs to move some replica to another disk.

3. I have updated the KIP to mention that the read size of a given
partition is configured using replica.fetch.max.bytes when we move replicas
between disks.

Please see this

for the change of the KIP. I will come up with a solution to throttle
replica movement when a broker is moving a "leader" replica.

On Tue, Jan 24, 2017 at 3:30 AM, Alexey Ozeritsky 
wrote:

>
>
> 23.01.2017, 22:11, "Dong Lin" :
> > Thanks. Please see my comment inline.
> >
> > On Mon, Jan 23, 2017 at 6:45 AM, Alexey Ozeritsky 
> > wrote:
> >
> >>  13.01.2017, 22:29, "Dong Lin" :
> >>  > Hey Alexey,
> >>  >
> >>  > Thanks for your review and the alternative approach. Here is my
> >>  > understanding of your patch. kafka's background threads are used to
> move
> >>  > data between replicas. When data movement is triggered, the log will
> be
> >>  > rolled and the new logs will be put in the new directory, and
> background
> >>  > threads will move segment from old directory to new directory.
> >>  >
> >>  > It is important to note that KIP-112 is intended to work with
> KIP-113 to
> >>  > support JBOD. I think your solution is definitely simpler and better
> >>  under
> >>  > the current kafka implementation that a broker will fail if any disk
> >>  fails.
> >>  > But I am not sure if we want to allow broker to run with partial
> disks
> >>  > failure. Let's say the a replica is being moved from log_dir_old to
> >>  > log_dir_new and then log_dir_old stops working due to disk failure.
> How
> >>  > would your existing patch handles it? To make the scenario a bit more
> >>
> >>  We will lose log_dir_old. After broker restart we can read the data
> from
> >>  log_dir_new.
> >
> > No, you probably can't. This is because the broker doesn't have *all* the
> > data for this partition. For example, say the broker has
> > partition_segement_1, partition_segment_50 and partition_segment_100 on
> the
> > log_dir_old. partition_segment_100, which has the latest data, has been
> > moved to log_dir_new, and the log_dir_old fails before
> partition_segment_50
> > and partition_segment_1 is moved to log_dir_new. When broker re-starts,
> it
> > won't have partition_segment_50. This causes problem if broker is elected
> > leader and consumer wants to consume data in the partition_segment_1.
>
> Right.
>
> >
> >>  > complicated, let's say the broker is shtudown, log_dir_old's disk
> fails,
> >>  > and the broker starts. In this case broker doesn't even know if
> >>  log_dir_new
> >>  > has all the data needed for this replica. It becomes a problem if the
> >>  > broker is elected leader of this partition in this case.
> >>
> >>  log_dir_new contains the most recent data so we will lose the tail of
> >>  partition.
> >>  This is not a big problem for us because we already delete tails by
> hand
> >>  (see https://issues.apache.org/jira/browse/KAFKA-1712).
> >>  Also we dont use authomatic leader balancing
> (auto.leader.rebalance.enable=false),
> >>  so this partition becomes the leader with a low probability.
> >>  I think my patch can be modified to prohibit the selection of the
> leader
> >>  until the partition does not move completely.
> >
> > I guess you are saying that you have deleted the tails by hand in your
> own
> > kafka branch. But KAFKA-1712 is not accepted into Kafka trunk and I am
> not
>
> No. We just modify segments mtime by cron job. This works with vanilla
> kafka.
>
> > sure if it is the right solution. How would this solution address the
> > problem mentioned above?
>
> If you need only fresh data and if you remove old data by hands this is
> not a problem. But in general case
> this is a problem of course.
>
> >
> > BTW, I am not sure the solution mentioned in KAFKA-1712 is the right way
> to
> > address its problem. Now that we have timestamp in the message we can use
> > that to delete old segement instead of relying on the log segment mtime.
> > Just some idea and we don't have to discuss this problem here.
> >
> >>  >
> >>  > The solution presented in the KIP attempts to handle it by replacing
> >>  > replica in an atomic 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-01-24 Thread Alexey Ozeritsky


23.01.2017, 22:11, "Dong Lin" :
> Thanks. Please see my comment inline.
>
> On Mon, Jan 23, 2017 at 6:45 AM, Alexey Ozeritsky 
> wrote:
>
>>  13.01.2017, 22:29, "Dong Lin" :
>>  > Hey Alexey,
>>  >
>>  > Thanks for your review and the alternative approach. Here is my
>>  > understanding of your patch. kafka's background threads are used to move
>>  > data between replicas. When data movement is triggered, the log will be
>>  > rolled and the new logs will be put in the new directory, and background
>>  > threads will move segment from old directory to new directory.
>>  >
>>  > It is important to note that KIP-112 is intended to work with KIP-113 to
>>  > support JBOD. I think your solution is definitely simpler and better
>>  under
>>  > the current kafka implementation that a broker will fail if any disk
>>  fails.
>>  > But I am not sure if we want to allow broker to run with partial disks
>>  > failure. Let's say the a replica is being moved from log_dir_old to
>>  > log_dir_new and then log_dir_old stops working due to disk failure. How
>>  > would your existing patch handles it? To make the scenario a bit more
>>
>>  We will lose log_dir_old. After broker restart we can read the data from
>>  log_dir_new.
>
> No, you probably can't. This is because the broker doesn't have *all* the
> data for this partition. For example, say the broker has
> partition_segement_1, partition_segment_50 and partition_segment_100 on the
> log_dir_old. partition_segment_100, which has the latest data, has been
> moved to log_dir_new, and the log_dir_old fails before partition_segment_50
> and partition_segment_1 is moved to log_dir_new. When broker re-starts, it
> won't have partition_segment_50. This causes problem if broker is elected
> leader and consumer wants to consume data in the partition_segment_1.

Right.

>
>>  > complicated, let's say the broker is shtudown, log_dir_old's disk fails,
>>  > and the broker starts. In this case broker doesn't even know if
>>  log_dir_new
>>  > has all the data needed for this replica. It becomes a problem if the
>>  > broker is elected leader of this partition in this case.
>>
>>  log_dir_new contains the most recent data so we will lose the tail of
>>  partition.
>>  This is not a big problem for us because we already delete tails by hand
>>  (see https://issues.apache.org/jira/browse/KAFKA-1712).
>>  Also we dont use authomatic leader balancing 
>> (auto.leader.rebalance.enable=false),
>>  so this partition becomes the leader with a low probability.
>>  I think my patch can be modified to prohibit the selection of the leader
>>  until the partition does not move completely.
>
> I guess you are saying that you have deleted the tails by hand in your own
> kafka branch. But KAFKA-1712 is not accepted into Kafka trunk and I am not

No. We just modify segments mtime by cron job. This works with vanilla kafka.

> sure if it is the right solution. How would this solution address the
> problem mentioned above?

If you need only fresh data and if you remove old data by hands this is not a 
problem. But in general case 
this is a problem of course.

>
> BTW, I am not sure the solution mentioned in KAFKA-1712 is the right way to
> address its problem. Now that we have timestamp in the message we can use
> that to delete old segement instead of relying on the log segment mtime.
> Just some idea and we don't have to discuss this problem here.
>
>>  >
>>  > The solution presented in the KIP attempts to handle it by replacing
>>  > replica in an atomic version fashion after the log in the new dir has
>>  fully
>>  > caught up with the log in the old dir. At at time the log can be
>>  considered
>>  > to exist on only one log directory.
>>
>>  As I understand your solution does not cover quotas.
>>  What happens if someone starts to transfer 100 partitions ?
>
> Good point. Quota can be implemented in the future. It is currently
> mentioned as as a potential future improvement in KIP-112
> .Thanks
> for the reminder. I will move it to KIP-113.
>
>>  > If yes, it will read a ByteBufferMessageSet from topicPartition.log and
>>  append the message set to topicPartition.move
>>
>>  i.e. processPartitionData will read data from the beginning of
>>  topicPartition.log? What is the read size?
>>  A ReplicaFetchThread reads many partitions so if one does some complicated
>>  work (= read a lot of data from disk) everything will slow down.
>>  I think read size should not be very big.
>>
>>  On the other hand at this point (processPartitionData) one can use only
>>  the new data (ByteBufferMessageSet from parameters) and wait until
>>  (topicPartition.move.smallestOffset <= topicPartition.log.smallestOffset
>>  && topicPartition.log.largestOffset == topicPartition.log.largestOffset).
>>  In this case the write speed to topicPartition.move and 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-01-23 Thread Dong Lin
Thanks. Please see my comment inline.

On Mon, Jan 23, 2017 at 6:45 AM, Alexey Ozeritsky 
wrote:

>
>
> 13.01.2017, 22:29, "Dong Lin" :
> > Hey Alexey,
> >
> > Thanks for your review and the alternative approach. Here is my
> > understanding of your patch. kafka's background threads are used to move
> > data between replicas. When data movement is triggered, the log will be
> > rolled and the new logs will be put in the new directory, and background
> > threads will move segment from old directory to new directory.
> >
> > It is important to note that KIP-112 is intended to work with KIP-113 to
> > support JBOD. I think your solution is definitely simpler and better
> under
> > the current kafka implementation that a broker will fail if any disk
> fails.
> > But I am not sure if we want to allow broker to run with partial disks
> > failure. Let's say the a replica is being moved from log_dir_old to
> > log_dir_new and then log_dir_old stops working due to disk failure. How
> > would your existing patch handles it? To make the scenario a bit more
>
> We will lose log_dir_old. After broker restart we can read the data from
> log_dir_new.
>

No, you probably can't. This is because the broker doesn't have *all* the
data for this partition. For example, say the broker has
partition_segement_1, partition_segment_50 and partition_segment_100 on the
log_dir_old. partition_segment_100, which has the latest data, has been
moved to log_dir_new, and the log_dir_old fails before partition_segment_50
and partition_segment_1 is moved to log_dir_new. When broker re-starts, it
won't have partition_segment_50. This causes problem if broker is elected
leader and consumer wants to consume data in the partition_segment_1.


>
> > complicated, let's say the broker is shtudown, log_dir_old's disk fails,
> > and the broker starts. In this case broker doesn't even know if
> log_dir_new
> > has all the data needed for this replica. It becomes a problem if the
> > broker is elected leader of this partition in this case.
>
> log_dir_new contains the most recent data so we will lose the tail of
> partition.
> This is not a big problem for us because we already delete tails by hand
> (see https://issues.apache.org/jira/browse/KAFKA-1712).
> Also we dont use authomatic leader balancing 
> (auto.leader.rebalance.enable=false),
> so this partition becomes the leader with a low probability.
> I think my patch can be modified to prohibit the selection of the leader
> until the partition does not move completely.
>

I guess you are saying that you have deleted the tails by hand in your own
kafka branch. But KAFKA-1712 is not accepted into Kafka trunk and I am not
sure if it is the right solution. How would this solution address the
problem mentioned above?

BTW, I am not sure the solution mentioned in KAFKA-1712 is the right way to
address its problem. Now that we have timestamp in the message we can use
that to delete old segement instead of relying on the log segment mtime.
Just some idea and we don't have to discuss this problem here.


>
> >
> > The solution presented in the KIP attempts to handle it by replacing
> > replica in an atomic version fashion after the log in the new dir has
> fully
> > caught up with the log in the old dir. At at time the log can be
> considered
> > to exist on only one log directory.
>
> As I understand your solution does not cover quotas.
> What happens if someone starts to transfer 100 partitions ?
>

Good point. Quota can be implemented in the future. It is currently
mentioned as as a potential future improvement in KIP-112
.Thanks
for the reminder. I will move it to KIP-113.


>
> > If yes, it will read a ByteBufferMessageSet from topicPartition.log and
> append the message set to topicPartition.move
>
> i.e. processPartitionData will read data from the beginning of
> topicPartition.log? What is the read size?
> A ReplicaFetchThread reads many partitions so if one does some complicated
> work (= read a lot of data from disk) everything will slow down.
> I think read size should not be very big.
>
> On the other hand at this point (processPartitionData) one can use only
> the new data (ByteBufferMessageSet  from parameters) and wait until
> (topicPartition.move.smallestOffset <= topicPartition.log.smallestOffset
> && topicPartition.log.largestOffset == topicPartition.log.largestOffset).
> In this case the write speed to topicPartition.move and topicPartition.log
> will be the same so this will allow us to move many partitions to one disk.
>
>
The read size of a given partition is configured
using replica.fetch.max.bytes, which is the same size used by FetchRequest
from follower to leader. If the broker is moving a replica for which it
acts as a follower, the disk write rate for moving this replica is at most
the rate it fetches from leader (assume it is catching up and has

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-01-23 Thread Alexey Ozeritsky


13.01.2017, 22:29, "Dong Lin" :
> Hey Alexey,
>
> Thanks for your review and the alternative approach. Here is my
> understanding of your patch. kafka's background threads are used to move
> data between replicas. When data movement is triggered, the log will be
> rolled and the new logs will be put in the new directory, and background
> threads will move segment from old directory to new directory.
>
> It is important to note that KIP-112 is intended to work with KIP-113 to
> support JBOD. I think your solution is definitely simpler and better under
> the current kafka implementation that a broker will fail if any disk fails.
> But I am not sure if we want to allow broker to run with partial disks
> failure. Let's say the a replica is being moved from log_dir_old to
> log_dir_new and then log_dir_old stops working due to disk failure. How
> would your existing patch handles it? To make the scenario a bit more

We will lose log_dir_old. After broker restart we can read the data from 
log_dir_new.

> complicated, let's say the broker is shtudown, log_dir_old's disk fails,
> and the broker starts. In this case broker doesn't even know if log_dir_new
> has all the data needed for this replica. It becomes a problem if the
> broker is elected leader of this partition in this case.

log_dir_new contains the most recent data so we will lose the tail of partition.
This is not a big problem for us because we already delete tails by hand (see 
https://issues.apache.org/jira/browse/KAFKA-1712).
Also we dont use authomatic leader balancing 
(auto.leader.rebalance.enable=false), so this partition becomes the leader with 
a low probability.
I think my patch can be modified to prohibit the selection of the leader until 
the partition does not move completely.

>
> The solution presented in the KIP attempts to handle it by replacing
> replica in an atomic version fashion after the log in the new dir has fully
> caught up with the log in the old dir. At at time the log can be considered
> to exist on only one log directory.

As I understand your solution does not cover quotas.
What happens if someone starts to transfer 100 partitions ? 

> If yes, it will read a ByteBufferMessageSet from topicPartition.log and 
> append the message set to topicPartition.move

i.e. processPartitionData will read data from the beginning of 
topicPartition.log? What is the read size?
A ReplicaFetchThread reads many partitions so if one does some complicated work 
(= read a lot of data from disk) everything will slow down.
I think read size should not be very big. 

On the other hand at this point (processPartitionData) one can use only the new 
data (ByteBufferMessageSet  from parameters) and wait until 
(topicPartition.move.smallestOffset <= topicPartition.log.smallestOffset && 
topicPartition.log.largestOffset == topicPartition.log.largestOffset).
In this case the write speed to topicPartition.move and topicPartition.log will 
be the same so this will allow us to move many partitions to one disk.

>
> And to answer your question, yes topicpartition.log refers to
> topic-paritition/segment.log.
>
> Thanks,
> Dong
>
> On Fri, Jan 13, 2017 at 4:12 AM, Alexey Ozeritsky 
> wrote:
>
>>  Hi,
>>
>>  We have the similar solution that have been working in production since
>>  2014. You can see it here: https://github.com/resetius/ka
>>  fka/commit/20658593e246d2184906879defa2e763c4d413fb
>>  The idea is very simple
>>  1. Disk balancer runs in a separate thread inside scheduler pool.
>>  2. It does not touch empty partitions
>>  3. Before it moves a partition it forcibly creates new segment on a
>>  destination disk
>>  4. It moves segment by segment from new to old.
>>  5. Log class works with segments on both disks
>>
>>  Your approach seems too complicated, moreover it means that you have to
>>  patch different components of the system
>>  Could you clarify what do you mean by topicPartition.log? Is it
>>  topic-paritition/segment.log ?
>>
>>  12.01.2017, 21:47, "Dong Lin" :
>>  > Hi all,
>>  >
>>  > We created KIP-113: Support replicas movement between log directories.
>>  > Please find the KIP wiki in the link
>>  > *https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%
>>  3A+Support+replicas+movement+between+log+directories
>>  > >  3A+Support+replicas+movement+between+log+directories>.*
>>  >
>>  > This KIP is related to KIP-112
>>  > >  3A+Handle+disk+failure+for+JBOD>:
>>  > Handle disk failure for JBOD. They are needed in order to support JBOD in
>>  > Kafka. Please help review the KIP. You feedback is appreciated!
>>  >
>>  > Thanks,
>>  > Dong


Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-01-13 Thread Dong Lin
Hey Alexey,

Thanks for your review and the alternative approach. Here is my
understanding of your patch. kafka's background threads are used to move
data between replicas. When data movement is triggered, the log will be
rolled and the new logs will be put in the new directory, and background
threads will move segment from old directory to new directory.

It is important to note that KIP-112 is intended to work with KIP-113 to
support JBOD. I think your solution is definitely simpler and better under
the current kafka implementation that a broker will fail if any disk fails.
But I am not sure if we want to allow broker to run with partial disks
failure. Let's say the a replica is being moved from log_dir_old to
log_dir_new and then log_dir_old stops working due to disk failure. How
would your existing patch handles it? To make the scenario a bit more
complicated, let's say the broker is shtudown, log_dir_old's disk fails,
and the broker starts. In this case broker doesn't even know if log_dir_new
has all the data needed for this replica. It becomes a problem if the
broker is elected leader of this partition in this case.

The solution presented in the KIP attempts to handle it by replacing
replica in an atomic version fashion after the log in the new dir has fully
caught up with the log in the old dir. At at time the log can be considered
to exist on only one log directory.

And to answer your question, yes topicpartition.log refers to
topic-paritition/segment.log.

Thanks,
Dong




On Fri, Jan 13, 2017 at 4:12 AM, Alexey Ozeritsky 
wrote:

> Hi,
>
> We have the similar solution that have been working in production since
> 2014. You can see it here: https://github.com/resetius/ka
> fka/commit/20658593e246d2184906879defa2e763c4d413fb
> The idea is very simple
> 1. Disk balancer runs in a separate thread inside scheduler pool.
> 2. It does not touch empty partitions
> 3. Before it moves a partition it forcibly creates new segment on a
> destination disk
> 4. It moves segment by segment from new to old.
> 5. Log class works with segments on both disks
>
> Your approach seems too complicated, moreover it means that you have to
> patch different components of the system
> Could you clarify what do you mean by topicPartition.log? Is it
> topic-paritition/segment.log ?
>
> 12.01.2017, 21:47, "Dong Lin" :
> > Hi all,
> >
> > We created KIP-113: Support replicas movement between log directories.
> > Please find the KIP wiki in the link
> > *https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%
> 3A+Support+replicas+movement+between+log+directories
> >  3A+Support+replicas+movement+between+log+directories>.*
> >
> > This KIP is related to KIP-112
> >  3A+Handle+disk+failure+for+JBOD>:
> > Handle disk failure for JBOD. They are needed in order to support JBOD in
> > Kafka. Please help review the KIP. You feedback is appreciated!
> >
> > Thanks,
> > Dong
>


Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-01-13 Thread Alexey Ozeritsky
Hi,

We have the similar solution that have been working in production since 2014. 
You can see it here: 
https://github.com/resetius/kafka/commit/20658593e246d2184906879defa2e763c4d413fb
The idea is very simple
1. Disk balancer runs in a separate thread inside scheduler pool.
2. It does not touch empty partitions
3. Before it moves a partition it forcibly creates new segment on a destination 
disk
4. It moves segment by segment from new to old.
5. Log class works with segments on both disks

Your approach seems too complicated, moreover it means that you have to patch 
different components of the system
Could you clarify what do you mean by topicPartition.log? Is it 
topic-paritition/segment.log ?

12.01.2017, 21:47, "Dong Lin" :
> Hi all,
>
> We created KIP-113: Support replicas movement between log directories.
> Please find the KIP wiki in the link
> *https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%3A+Support+replicas+movement+between+log+directories
> .*
>
> This KIP is related to KIP-112
> :
> Handle disk failure for JBOD. They are needed in order to support JBOD in
> Kafka. Please help review the KIP. You feedback is appreciated!
>
> Thanks,
> Dong


[DISCUSS] KIP-113: Support replicas movement between log directories

2017-01-12 Thread Dong Lin
Hi all,

We created KIP-113: Support replicas movement between log directories.
Please find the KIP wiki in the link
*https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%3A+Support+replicas+movement+between+log+directories
.*

This KIP is related to KIP-112
:
Handle disk failure for JBOD. They are needed in order to support JBOD in
Kafka. Please help review the KIP. You feedback is appreciated!

Thanks,
Dong